From 350ece15331ee33d1a9f304309ca69c1bb9817ee Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 12 May 2018 14:24:59 +0800 Subject: [PATCH] --- src/org/redkale/source/DataJdbcOldSource.java | 4 +-- src/org/redkale/source/DataJdbcSource.java | 6 ++-- src/org/redkale/source/DataSqlSource.java | 9 ++++-- src/org/redkale/source/PoolJdbcSource.java | 4 +-- src/org/redkale/source/PoolTcpSource.java | 31 ++++++++++++++----- 5 files changed, 36 insertions(+), 18 deletions(-) diff --git a/src/org/redkale/source/DataJdbcOldSource.java b/src/org/redkale/source/DataJdbcOldSource.java index c0794c639..746f8dbbd 100644 --- a/src/org/redkale/source/DataJdbcOldSource.java +++ b/src/org/redkale/source/DataJdbcOldSource.java @@ -131,8 +131,8 @@ public class DataJdbcOldSource extends AbstractService implements DataSource, Da protected void initByProperties(String unitName, Properties readprop, Properties writeprop) { this.name = unitName; this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); - this.readPool = new PoolJdbcSource(unitName, persistxml, "read", readprop, logger); - this.writePool = new PoolJdbcSource(unitName, persistxml, "write", writeprop, logger); + this.readPool = new PoolJdbcSource(unitName, persistxml, "read", null, readprop, logger); + this.writePool = new PoolJdbcSource(unitName, persistxml, "write", null, writeprop, logger); } @Local diff --git a/src/org/redkale/source/DataJdbcSource.java b/src/org/redkale/source/DataJdbcSource.java index 68fc4dab8..201bce913 100644 --- a/src/org/redkale/source/DataJdbcSource.java +++ b/src/org/redkale/source/DataJdbcSource.java @@ -9,7 +9,7 @@ import java.io.Serializable; import java.net.URL; import java.sql.*; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.Consumer; import java.util.logging.Level; @@ -45,8 +45,8 @@ public class DataJdbcSource extends DataSqlSource { } @Override - protected PoolSource createPoolSource(DataSource source, String rwtype, Properties prop) { - return new PoolJdbcSource(this.name, this.persistxml, rwtype, prop, this.logger); + protected PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop) { + return new PoolJdbcSource(this.name, this.persistxml, rwtype, queue, prop, this.logger); } @Override diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index 629907794..b96c3ecb3 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -69,8 +69,10 @@ public abstract class DataSqlSource extends AbstractService implement public DataSqlSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) { final AtomicInteger counter = new AtomicInteger(); this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)); + int maxconns = Math.max(8, Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16))); if (readprop != writeprop) { this.threads += Integer.decode(writeprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)); + maxconns = 0; } final String cname = this.getClass().getSimpleName(); final Thread.UncaughtExceptionHandler ueh = (t, e) -> { @@ -99,8 +101,9 @@ public abstract class DataSqlSource extends AbstractService implement this.name = unitName; this.persistxml = persistxml; this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); - this.readPool = createPoolSource(this, "read", readprop); - this.writePool = createPoolSource(this, "write", writeprop); + ArrayBlockingQueue queue = maxconns > 0 ? new ArrayBlockingQueue(maxconns) : null; + this.readPool = createPoolSource(this, "read", queue, readprop); + this.writePool = createPoolSource(this, "write", queue, writeprop); } //是否异步, 为true则只能调用pollAsync方法,为false则只能调用poll方法 @@ -110,7 +113,7 @@ public abstract class DataSqlSource extends AbstractService implement protected abstract String prepareParamSign(int index); //创建连接池 - protected abstract PoolSource createPoolSource(DataSource source, String rwtype, Properties prop); + protected abstract PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop); //插入纪录 protected abstract CompletableFuture insertDB(final EntityInfo info, T... values); diff --git a/src/org/redkale/source/PoolJdbcSource.java b/src/org/redkale/source/PoolJdbcSource.java index faf145acc..f56f68aec 100644 --- a/src/org/redkale/source/PoolJdbcSource.java +++ b/src/org/redkale/source/PoolJdbcSource.java @@ -39,12 +39,12 @@ public class PoolJdbcSource extends PoolSource { private final URL persistxml; - public PoolJdbcSource(String unitName, URL persistxml, String rwtype, Properties prop, Logger logger) { + public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Properties prop, Logger logger) { super(rwtype, prop, logger); this.unitName = unitName; this.persistxml = persistxml; this.source = createDataSource(prop); - this.queue = new ArrayBlockingQueue<>(this.maxconns); + this.queue = aqueue == null ? new ArrayBlockingQueue<>(this.maxconns) : aqueue; this.listener = new ConnectionEventListener() { @Override diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java index ca29924c3..ab561be39 100644 --- a/src/org/redkale/source/PoolTcpSource.java +++ b/src/org/redkale/source/PoolTcpSource.java @@ -33,7 +33,7 @@ public abstract class PoolTcpSource extends PoolSource { protected final ArrayBlockingQueue connQueue; - public PoolTcpSource(String rwtype, Properties prop, Logger logger, ObjectPool bufferPool, ThreadPoolExecutor executor) { + public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Properties prop, Logger logger, ObjectPool bufferPool, ThreadPoolExecutor executor) { super(rwtype, prop, logger); this.bufferPool = bufferPool; this.executor = executor; @@ -42,7 +42,7 @@ public abstract class PoolTcpSource extends PoolSource { } catch (IOException e) { throw new RuntimeException(e); } - this.connQueue = new ArrayBlockingQueue<>(this.maxconns); + this.connQueue = queue == null ? new ArrayBlockingQueue<>(this.maxconns) : queue; } @Override @@ -53,7 +53,18 @@ public abstract class PoolTcpSource extends PoolSource { usingCounter.decrementAndGet(); } else { //usingCounter 会在close方法中执行 - conn.dispose(); + CompletableFuture future = null; + try { + future = sendCloseCommand(conn); + } catch (Exception e) { + } + if (future == null) { + conn.dispose(); + } else { + future.whenComplete((c, t) -> { + if (c != null) c.dispose(); + }); + } } } @@ -167,16 +178,20 @@ public abstract class PoolTcpSource extends PoolSource { @Override public void close() { connQueue.stream().forEach(x -> { + CompletableFuture future = null; try { - sendCloseCommand(x); + future = sendCloseCommand(x); } catch (Exception e) { } - try { - x.close(); - } catch (Exception e) { + if (future == null) { + x.dispose(); + } else { + future.whenComplete((c, t) -> { + if (c != null) c.dispose(); + }); } }); } - protected abstract void sendCloseCommand(final AsyncConnection conn); + protected abstract CompletableFuture sendCloseCommand(final AsyncConnection conn); }