PoolTcpSource增加ping接口
This commit is contained in:
@@ -209,6 +209,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
|||||||
@Override
|
@Override
|
||||||
public void destroy(AnyValue config) {
|
public void destroy(AnyValue config) {
|
||||||
if (this.executor != null) this.executor.shutdownNow();
|
if (this.executor != null) this.executor.shutdownNow();
|
||||||
|
if (readPool != null) readPool.close();
|
||||||
|
if (writePool != null) writePool.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Local
|
@Local
|
||||||
|
|||||||
@@ -38,6 +38,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
//TCP Channel组
|
//TCP Channel组
|
||||||
protected AsynchronousChannelGroup group;
|
protected AsynchronousChannelGroup group;
|
||||||
|
|
||||||
|
protected ScheduledThreadPoolExecutor scheduler;
|
||||||
|
|
||||||
protected final ArrayBlockingQueue<AsyncConnection> connQueue;
|
protected final ArrayBlockingQueue<AsyncConnection> connQueue;
|
||||||
|
|
||||||
public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
|
public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
|
||||||
@@ -50,6 +52,34 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
this.connQueue = queue == null ? new ArrayBlockingQueue<>(this.maxconns) : queue;
|
this.connQueue = queue == null ? new ArrayBlockingQueue<>(this.maxconns) : queue;
|
||||||
|
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
|
||||||
|
final Thread t = new Thread(r, "PoolSource-Scheduled-Thread");
|
||||||
|
t.setDaemon(true);
|
||||||
|
return t;
|
||||||
|
});
|
||||||
|
this.scheduler.scheduleAtFixedRate(() -> {
|
||||||
|
runPingTask();
|
||||||
|
}, 60, 30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runPingTask() {
|
||||||
|
try {
|
||||||
|
if (connQueue.isEmpty()) return;
|
||||||
|
long time = System.currentTimeMillis() - 30 * 1000;
|
||||||
|
pollAsync().whenComplete((conn, e) -> {
|
||||||
|
if (e != null) return;
|
||||||
|
if (conn.getLastReadTime() >= time || conn.getLastWriteTime() >= time) return; //半分钟内已经用过
|
||||||
|
CompletableFuture<AsyncConnection> future = sendPingCommand(conn);
|
||||||
|
if (future == null) return; //不支持ping
|
||||||
|
future.whenComplete((conn2, e2) -> {
|
||||||
|
if (e != null) return;
|
||||||
|
offerConnection(conn2);
|
||||||
|
runPingTask();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.FINEST, "PoolSource task ping failed", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -216,6 +246,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
this.scheduler.shutdownNow();
|
||||||
connQueue.stream().forEach(x -> {
|
connQueue.stream().forEach(x -> {
|
||||||
CompletableFuture<AsyncConnection> future = null;
|
CompletableFuture<AsyncConnection> future = null;
|
||||||
try {
|
try {
|
||||||
@@ -232,5 +263,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract CompletableFuture<AsyncConnection> sendPingCommand(final AsyncConnection conn);
|
||||||
|
|
||||||
protected abstract CompletableFuture<AsyncConnection> sendCloseCommand(final AsyncConnection conn);
|
protected abstract CompletableFuture<AsyncConnection> sendCloseCommand(final AsyncConnection conn);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user