diff --git a/src/org/redkale/source/DataJdbcSource.java b/src/org/redkale/source/DataJdbcSource.java index 827923c67..a523294f8 100644 --- a/src/org/redkale/source/DataJdbcSource.java +++ b/src/org/redkale/source/DataJdbcSource.java @@ -45,8 +45,8 @@ public class DataJdbcSource extends DataSqlSource { } @Override - protected PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop) { - return new PoolJdbcSource(this.name, this.persistxml, rwtype, queue, prop, this.logger); + protected PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop) { + return new PoolJdbcSource(this.name, this.persistxml, rwtype, queue, semaphore, prop, this.logger); } @Override diff --git a/src/org/redkale/source/DataMemorySource.java b/src/org/redkale/source/DataMemorySource.java index 9e24f95c3..6b365a73c 100644 --- a/src/org/redkale/source/DataMemorySource.java +++ b/src/org/redkale/source/DataMemorySource.java @@ -79,7 +79,7 @@ public class DataMemorySource extends DataSqlSource { } @Override - protected PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop) { + protected PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop) { return null; } diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index 80cce0e66..4ff914d7c 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -104,8 +104,9 @@ public abstract class DataSqlSource extends AbstractService implement this.persistxml = persistxml; this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); ArrayBlockingQueue queue = maxconns > 0 ? new ArrayBlockingQueue(maxconns) : null; - this.readPool = createPoolSource(this, "read", queue, readprop); - this.writePool = createPoolSource(this, "write", queue, writeprop); + Semaphore semaphore = maxconns > 0 ? new Semaphore(maxconns) : null; + this.readPool = createPoolSource(this, "read", queue, semaphore, readprop); + this.writePool = createPoolSource(this, "write", queue, semaphore, writeprop); } @Local @@ -124,7 +125,7 @@ public abstract class DataSqlSource extends AbstractService implement protected abstract String prepareParamSign(int index); //创建连接池 - protected abstract PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop); + protected abstract PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop); //插入纪录 protected abstract CompletableFuture insertDB(final EntityInfo info, T... values); diff --git a/src/org/redkale/source/PoolJdbcSource.java b/src/org/redkale/source/PoolJdbcSource.java index 750990373..c20afebb6 100644 --- a/src/org/redkale/source/PoolJdbcSource.java +++ b/src/org/redkale/source/PoolJdbcSource.java @@ -33,8 +33,8 @@ public class PoolJdbcSource extends PoolSource { protected final URL persistxml; - public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Properties prop, Logger logger) { - super(rwtype, prop, logger); + public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Semaphore semaphore, Properties prop, Logger logger) { + super(rwtype, semaphore, prop, logger); this.unitName = unitName; this.persistxml = persistxml; this.source = createDataSource(prop); diff --git a/src/org/redkale/source/PoolSource.java b/src/org/redkale/source/PoolSource.java index 25c1d9d30..70feffb1d 100644 --- a/src/org/redkale/source/PoolSource.java +++ b/src/org/redkale/source/PoolSource.java @@ -23,6 +23,8 @@ import static org.redkale.source.DataSources.*; */ public abstract class PoolSource { + protected final AtomicLong closeCounter = new AtomicLong(); + protected final AtomicLong usingCounter = new AtomicLong(); protected final AtomicLong creatCounter = new AtomicLong(); @@ -64,7 +66,7 @@ public abstract class PoolSource { protected Properties attributes = new Properties(); @SuppressWarnings("OverridableMethodCallInConstructor") - public PoolSource(String rwtype, Properties prop, Logger logger) { + public PoolSource(String rwtype, Semaphore semaphore, Properties prop, Logger logger) { this.logger = logger; this.rwtype = rwtype; this.props = prop; @@ -76,7 +78,7 @@ public abstract class PoolSource { this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3")); this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3")); this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Runtime.getRuntime().availableProcessors() * 100))); - this.semaphore = new Semaphore(this.maxconns); + this.semaphore = semaphore == null ? new Semaphore(this.maxconns) : semaphore; String dbtype0 = ""; { //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串 int pos = this.url.indexOf("://"); @@ -161,6 +163,10 @@ public abstract class PoolSource { return dbtype; } + public final long getCloseCount() { + return closeCounter.longValue(); + } + public final long getUsingCount() { return usingCounter.longValue(); } diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java index dc71d21f5..c3850b3fd 100644 --- a/src/org/redkale/source/PoolTcpSource.java +++ b/src/org/redkale/source/PoolTcpSource.java @@ -40,8 +40,8 @@ public abstract class PoolTcpSource extends PoolSource { protected final ArrayBlockingQueue connQueue; - public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Properties prop, Logger logger, ObjectPool bufferPool, ThreadPoolExecutor executor) { - super(rwtype, prop, logger); + public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop, Logger logger, ObjectPool bufferPool, ThreadPoolExecutor executor) { + super(rwtype, semaphore, prop, logger); this.bufferPool = bufferPool; this.executor = executor; try { @@ -55,7 +55,7 @@ public abstract class PoolTcpSource extends PoolSource { @Override public void offerConnection(final AsyncConnection conn) { if (conn == null) return; - if (connQueue.offer(conn)) { + if (conn.isOpen() && connQueue.offer(conn)) { saveCounter.incrementAndGet(); usingCounter.decrementAndGet(); } else { @@ -136,6 +136,7 @@ public abstract class PoolTcpSource extends PoolSource { return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> { conn.beforeCloseListener((c) -> { semaphore.release(); + closeCounter.incrementAndGet(); usingCounter.decrementAndGet(); }); CompletableFuture future = new CompletableFuture();