From dbd4a79589786853c3b1ceb6b2ea02775be471ce Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 4 Dec 2023 21:05:42 +0800 Subject: [PATCH] =?UTF-8?q?VertxSqlDataSource=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/AsyncIOGroup.java | 8 ++------ .../org/redkale/net/AsyncNioTcpProtocolServer.java | 12 ++++++------ .../org/redkale/net/AsyncNioUdpProtocolServer.java | 14 +++++++------- .../org/redkale/source/AbstractDataSource.java | 10 +++++++--- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index d23324f7c..5d566f89b 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -243,12 +243,8 @@ public class AsyncIOGroup extends AsyncGroup { public void completed(Void result, Void attachment) { conn.setReadTimeoutSeconds(readTimeoutSeconds); conn.setWriteTimeoutSeconds(writeTimeoutSeconds); - if (connCreateCounter != null) { - connCreateCounter.increment(); - } - if (connLivingCounter != null) { - connLivingCounter.increment(); - } + connCreateCounter.increment(); + connLivingCounter.increment(); if (conn.sslEngine == null) { future.complete(conn); } else { diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index cd13f8fa0..f178af230 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -32,8 +32,6 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { private AsyncIOGroup ioGroup; - private Thread acceptThread; - private boolean closed; private Supplier responseSupplier; @@ -117,13 +115,14 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { ObjectPool pool = localResponsePool.get(); (pool == null ? safeResponsePool : pool).accept(v); }; - final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); + final String threadNameFormat = Utility.isEmpty(server.name) ? "Redkale-IOServletThread-%s" + : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); if (this.ioGroup == null) { this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool); this.ioGroup.start(); } - this.acceptThread = new Thread() { + Thread acceptThread = new Thread() { { setName(String.format(threadNameFormat, "Accept")); } @@ -164,7 +163,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { } } }; - this.acceptThread.start(); + acceptThread.start(); } private void accept(AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException { @@ -177,7 +176,8 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); ioGroup.connCreateCounter.increment(); ioGroup.connLivingCounter.increment(); - AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioReadThread, ioWriteThread, channel, context.getSSLBuilder(), context.getSSLContext(), null); + AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, + ioReadThread, ioWriteThread, channel, context.getSSLBuilder(), context.getSSLContext(), null); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); conn.protocolCodec = codec; if (conn.sslEngine == null) { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index ffdd3ce48..b5a1df43e 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -34,8 +34,6 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { private AsyncIOGroup ioGroup; - private Thread acceptThread; - private boolean closed; private Supplier responseSupplier; @@ -106,17 +104,18 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { ObjectPool pool = localResponsePool.get(); return pool == null ? safeResponsePool.get() : pool.get(); }; - this.responseConsumer = (v) -> { + this.responseConsumer = v -> { ObjectPool pool = localResponsePool.get(); (pool == null ? safeResponsePool : pool).accept(v); }; - final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); + final String threadNameFormat = Utility.isEmpty(server.name) ? "Redkale-IOServletThread-%s" + : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); if (this.ioGroup == null) { this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool); this.ioGroup.start(); } udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ); - this.acceptThread = new Thread() { + Thread acceptThread = new Thread() { { setName(String.format(threadNameFormat, "Accept")); } @@ -175,13 +174,14 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { } } }; - this.acceptThread.start(); + acceptThread.start(); } private void accept(SocketAddress address, ByteBuffer buffer, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException { ioGroup.connCreateCounter.increment(); ioGroup.connLivingCounter.increment(); - AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioReadThread, ioWriteThread, udpServerChannel.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address); + AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, + ioReadThread, ioWriteThread, udpServerChannel.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address); conn.udpServerChannel = udpServerChannel; udpServerChannel.connections.put(address, conn); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); diff --git a/src/main/java/org/redkale/source/AbstractDataSource.java b/src/main/java/org/redkale/source/AbstractDataSource.java index e3d8d33c8..9d3173996 100644 --- a/src/main/java/org/redkale/source/AbstractDataSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSource.java @@ -45,7 +45,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data private final ReentrantLock executorLock = new ReentrantLock(); - private int sourceThreads = Utility.cpus(); + protected int sourceThreads = Utility.cpus(); @Resource(name = RESNAME_APP_EXECUTOR, required = false) private ExecutorService sourceExecutor; @@ -70,6 +70,10 @@ public abstract class AbstractDataSource extends AbstractService implements Data @ResourceListener public abstract void onResourceChange(ResourceEvent[] events); + protected void setSourceExecutor(ExecutorService executor) { + this.sourceExecutor = executor; + } + protected SourceUrlInfo parseSourceUrl(final String url) { final SourceUrlInfo info = new SourceUrlInfo(); info.url = url; @@ -146,11 +150,11 @@ public abstract class AbstractDataSource extends AbstractService implements Data return executor; } - protected void complete(CompletableFuture future, T value) { + protected void complete(WorkThread workThread, CompletableFuture future, T value) { getExecutor().execute(() -> future.complete(value)); } - protected void completeExceptionally(CompletableFuture future, Throwable exp) { + protected void completeExceptionally(WorkThread workThread, CompletableFuture future, Throwable exp) { getExecutor().execute(() -> future.completeExceptionally(exp)); }