This commit is contained in:
Redkale
2018-08-09 19:32:03 +08:00
parent e00ed8ae37
commit 04a4ce12c7
6 changed files with 21 additions and 13 deletions

View File

@@ -45,8 +45,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
} }
@Override @Override
protected PoolSource<Connection> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop) { protected PoolSource<Connection> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop) {
return new PoolJdbcSource(this.name, this.persistxml, rwtype, queue, prop, this.logger); return new PoolJdbcSource(this.name, this.persistxml, rwtype, queue, semaphore, prop, this.logger);
} }
@Override @Override

View File

@@ -79,7 +79,7 @@ public class DataMemorySource extends DataSqlSource<Void> {
} }
@Override @Override
protected PoolSource<Void> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop) { protected PoolSource<Void> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop) {
return null; return null;
} }

View File

@@ -104,8 +104,9 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
this.persistxml = persistxml; this.persistxml = persistxml;
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
ArrayBlockingQueue<DBChannel> queue = maxconns > 0 ? new ArrayBlockingQueue(maxconns) : null; ArrayBlockingQueue<DBChannel> queue = maxconns > 0 ? new ArrayBlockingQueue(maxconns) : null;
this.readPool = createPoolSource(this, "read", queue, readprop); Semaphore semaphore = maxconns > 0 ? new Semaphore(maxconns) : null;
this.writePool = createPoolSource(this, "write", queue, writeprop); this.readPool = createPoolSource(this, "read", queue, semaphore, readprop);
this.writePool = createPoolSource(this, "write", queue, semaphore, writeprop);
} }
@Local @Local
@@ -124,7 +125,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
protected abstract String prepareParamSign(int index); protected abstract String prepareParamSign(int index);
//创建连接池 //创建连接池
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop); protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop);
//插入纪录 //插入纪录
protected abstract <T> CompletableFuture<Integer> insertDB(final EntityInfo<T> info, T... values); protected abstract <T> CompletableFuture<Integer> insertDB(final EntityInfo<T> info, T... values);

View File

@@ -33,8 +33,8 @@ public class PoolJdbcSource extends PoolSource<Connection> {
protected final URL persistxml; protected final URL persistxml;
public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Properties prop, Logger logger) { public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Semaphore semaphore, Properties prop, Logger logger) {
super(rwtype, prop, logger); super(rwtype, semaphore, prop, logger);
this.unitName = unitName; this.unitName = unitName;
this.persistxml = persistxml; this.persistxml = persistxml;
this.source = createDataSource(prop); this.source = createDataSource(prop);

View File

@@ -23,6 +23,8 @@ import static org.redkale.source.DataSources.*;
*/ */
public abstract class PoolSource<DBChannel> { public abstract class PoolSource<DBChannel> {
protected final AtomicLong closeCounter = new AtomicLong();
protected final AtomicLong usingCounter = new AtomicLong(); protected final AtomicLong usingCounter = new AtomicLong();
protected final AtomicLong creatCounter = new AtomicLong(); protected final AtomicLong creatCounter = new AtomicLong();
@@ -64,7 +66,7 @@ public abstract class PoolSource<DBChannel> {
protected Properties attributes = new Properties(); protected Properties attributes = new Properties();
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
public PoolSource(String rwtype, Properties prop, Logger logger) { public PoolSource(String rwtype, Semaphore semaphore, Properties prop, Logger logger) {
this.logger = logger; this.logger = logger;
this.rwtype = rwtype; this.rwtype = rwtype;
this.props = prop; this.props = prop;
@@ -76,7 +78,7 @@ public abstract class PoolSource<DBChannel> {
this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3")); this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3"));
this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3")); this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3"));
this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Runtime.getRuntime().availableProcessors() * 100))); this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Runtime.getRuntime().availableProcessors() * 100)));
this.semaphore = new Semaphore(this.maxconns); this.semaphore = semaphore == null ? new Semaphore(this.maxconns) : semaphore;
String dbtype0 = ""; String dbtype0 = "";
{ //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串 { //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串
int pos = this.url.indexOf("://"); int pos = this.url.indexOf("://");
@@ -161,6 +163,10 @@ public abstract class PoolSource<DBChannel> {
return dbtype; return dbtype;
} }
public final long getCloseCount() {
return closeCounter.longValue();
}
public final long getUsingCount() { public final long getUsingCount() {
return usingCounter.longValue(); return usingCounter.longValue();
} }

View File

@@ -40,8 +40,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
protected final ArrayBlockingQueue<AsyncConnection> connQueue; protected final ArrayBlockingQueue<AsyncConnection> connQueue;
public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) { public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
super(rwtype, prop, logger); super(rwtype, semaphore, prop, logger);
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.executor = executor; this.executor = executor;
try { try {
@@ -55,7 +55,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
@Override @Override
public void offerConnection(final AsyncConnection conn) { public void offerConnection(final AsyncConnection conn) {
if (conn == null) return; if (conn == null) return;
if (connQueue.offer(conn)) { if (conn.isOpen() && connQueue.offer(conn)) {
saveCounter.incrementAndGet(); saveCounter.incrementAndGet();
usingCounter.decrementAndGet(); usingCounter.decrementAndGet();
} else { } else {
@@ -136,6 +136,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> { return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> {
conn.beforeCloseListener((c) -> { conn.beforeCloseListener((c) -> {
semaphore.release(); semaphore.release();
closeCounter.incrementAndGet();
usingCounter.decrementAndGet(); usingCounter.decrementAndGet();
}); });
CompletableFuture<AsyncConnection> future = new CompletableFuture(); CompletableFuture<AsyncConnection> future = new CompletableFuture();