From e55d991bce8e9d25559e3fae800953f1c1a5c4ef Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 31 Jan 2023 10:24:52 +0800 Subject: [PATCH] =?UTF-8?q?UDP=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/AsyncGroup.java | 20 +++++++++++++++++-- .../java/org/redkale/net/AsyncIOGroup.java | 14 ++++++++++--- .../net/AsyncNioTcpProtocolServer.java | 4 ++-- .../net/AsyncNioUdpProtocolServer.java | 4 ++-- src/main/java/org/redkale/net/Server.java | 5 ++++- .../java/org/redkale/net/sncp/SncpServer.java | 5 +++-- .../java/org/redkale/test/sncp/SncpTest.java | 5 +++-- 7 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index 83ce40121..8ed374701 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -30,14 +30,30 @@ public abstract class AsyncGroup { return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); } - public static AsyncGroup create(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { + public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, int bufferCapacity, ObjectPool safeBufferPool) { return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); } + public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); + } + + public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { + return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, safeBufferPool); + } + + public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); + } + + public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, int bufferCapacity, ObjectPool safeBufferPool) { + return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, safeBufferPool); + } + public CompletableFuture createTCPClient(final SocketAddress address) { return createTCPClient(address, 0, 0); } diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 77ee2a93d..dc63b379b 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -59,11 +59,15 @@ public class AsyncIOGroup extends AsyncGroup { protected final ScheduledThreadPoolExecutor timeoutExecutor; public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) { - this(true, "Redkale-AnonymousClient-IOThread-%s", null, bufferCapacity, bufferPoolSize); + this(true, "Redkale-AnonymousClient-IOThread-%s", Utility.cpus(), null, bufferCapacity, bufferPoolSize); } public AsyncIOGroup(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - this(clientMode, threadNameFormat, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, + this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, bufferPoolSize); + } + + public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + this(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) { return false; @@ -75,8 +79,12 @@ public class AsyncIOGroup extends AsyncGroup { @SuppressWarnings("OverridableMethodCallInConstructor") public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { + this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, safeBufferPool); + } + + @SuppressWarnings("OverridableMethodCallInConstructor") + public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { this.bufferCapacity = bufferCapacity; - final int threads = Utility.cpus(); this.ioReadThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads]; final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index cf09d5933..7a013d937 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -61,10 +61,10 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); } if (options.contains(StandardSocketOptions.SO_RCVBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024); } if (options.contains(StandardSocketOptions.SO_SNDBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024); } } diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 0ae373d52..7f2ce7ada 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -60,10 +60,10 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); } if (options.contains(StandardSocketOptions.SO_RCVBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024); } if (options.contains(StandardSocketOptions.SO_SNDBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024); } } diff --git a/src/main/java/org/redkale/net/Server.java b/src/main/java/org/redkale/net/Server.java index 31e2c9149..245a0b4dd 100644 --- a/src/main/java/org/redkale/net/Server.java +++ b/src/main/java/org/redkale/net/Server.java @@ -127,7 +127,7 @@ public abstract class Server= 1000) { + return "" + value; + } return value + "B"; } diff --git a/src/main/java/org/redkale/net/sncp/SncpServer.java b/src/main/java/org/redkale/net/sncp/SncpServer.java index 71748c3f8..79ba30ed7 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServer.java +++ b/src/main/java/org/redkale/net/sncp/SncpServer.java @@ -118,8 +118,9 @@ public class SncpServer extends Server 0) {