diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index e9ab2764b..2d35fd362 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -209,6 +209,8 @@ public abstract class DataSqlSource extends AbstractService implement @Override public void destroy(AnyValue config) { if (this.executor != null) this.executor.shutdownNow(); + if (readPool != null) readPool.close(); + if (writePool != null) writePool.close(); } @Local diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java index 3d5be275d..e54a0c01c 100644 --- a/src/org/redkale/source/PoolTcpSource.java +++ b/src/org/redkale/source/PoolTcpSource.java @@ -38,6 +38,8 @@ public abstract class PoolTcpSource extends PoolSource { //TCP Channel组 protected AsynchronousChannelGroup group; + protected ScheduledThreadPoolExecutor scheduler; + protected final ArrayBlockingQueue connQueue; public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop, Logger logger, ObjectPool bufferPool, ThreadPoolExecutor executor) { @@ -50,6 +52,34 @@ public abstract class PoolTcpSource extends PoolSource { throw new RuntimeException(e); } this.connQueue = queue == null ? new ArrayBlockingQueue<>(this.maxconns) : queue; + this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { + final Thread t = new Thread(r, "PoolSource-Scheduled-Thread"); + t.setDaemon(true); + return t; + }); + this.scheduler.scheduleAtFixedRate(() -> { + runPingTask(); + }, 60, 30, TimeUnit.SECONDS); + } + + private void runPingTask() { + try { + if (connQueue.isEmpty()) return; + long time = System.currentTimeMillis() - 30 * 1000; + pollAsync().whenComplete((conn, e) -> { + if (e != null) return; + if (conn.getLastReadTime() >= time || conn.getLastWriteTime() >= time) return; //半分钟内已经用过 + CompletableFuture future = sendPingCommand(conn); + if (future == null) return; //不支持ping + future.whenComplete((conn2, e2) -> { + if (e != null) return; + offerConnection(conn2); + runPingTask(); + }); + }); + } catch (Exception e) { + logger.log(Level.FINEST, "PoolSource task ping failed", e); + } } @Override @@ -216,6 +246,7 @@ public abstract class PoolTcpSource extends PoolSource { @Override public void close() { + this.scheduler.shutdownNow(); connQueue.stream().forEach(x -> { CompletableFuture future = null; try { @@ -232,5 +263,7 @@ public abstract class PoolTcpSource extends PoolSource { }); } + protected abstract CompletableFuture sendPingCommand(final AsyncConnection conn); + protected abstract CompletableFuture sendCloseCommand(final AsyncConnection conn); }