From 1d6b76b1822cfc8d297dbf5edc737d457dbf8311 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 7 Feb 2023 16:43:47 +0800 Subject: [PATCH] =?UTF-8?q?udp=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/AsyncGroup.java | 21 ++--- .../java/org/redkale/net/AsyncIOGroup.java | 24 ++--- .../java/org/redkale/net/AsyncIOThread.java | 4 +- .../net/AsyncNioTcpProtocolServer.java | 5 +- .../net/AsyncNioUdpProtocolServer.java | 10 +-- src/main/java/org/redkale/net/Server.java | 6 +- .../net/client/ClientReadIOThread.java | 5 +- .../net/client/ClientWriteIOThread.java | 2 +- .../java/org/redkale/net/http/HttpServer.java | 20 ++--- .../redkale/net/http/WebSocketAsyncGroup.java | 9 +- .../net/http/WebSocketWriteIOThread.java | 2 +- .../java/org/redkale/net/sncp/SncpServer.java | 14 +-- .../java/org/redkale/util/ByteBufferPool.java | 88 +++++++++++++++++++ .../test/sncp/SncpClientCodecTest.java | 4 +- .../java/org/redkale/test/sncp/SncpTest.java | 7 +- .../test/sncp/SncpTestServiceImpl.java | 2 +- 16 files changed, 142 insertions(+), 81 deletions(-) create mode 100644 src/main/java/org/redkale/util/ByteBufferPool.java diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index 8ed374701..9f3d1daa0 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -6,9 +6,8 @@ package org.redkale.net; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.util.concurrent.*; -import org.redkale.util.ObjectPool; +import org.redkale.util.*; /** * Client模式的AsyncConnection连接构造器 @@ -22,36 +21,38 @@ import org.redkale.util.ObjectPool; */ public abstract class AsyncGroup { + public static final int UDP_BUFFER_CAPACITY = Integer.getInteger("redkale.udp.buffer.apacity", 1350); + public static AsyncGroup create(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { - return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); + public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { + return new AsyncIOGroup(true, threadNameFormat, workExecutor, safeBufferPool); } public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, int bufferCapacity, ObjectPool safeBufferPool) { - return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); + public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { + return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, safeBufferPool); } public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { - return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, safeBufferPool); + public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { + return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, safeBufferPool); } public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, int bufferCapacity, ObjectPool safeBufferPool) { - return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, safeBufferPool); + public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { + return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, safeBufferPool); } public CompletableFuture createTCPClient(final SocketAddress address) { diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 350bdfbdf..daa08dfc0 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -7,7 +7,6 @@ package org.redkale.net; import java.io.IOException; import java.net.*; -import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Objects; import java.util.concurrent.*; @@ -67,24 +66,17 @@ public class AsyncIOGroup extends AsyncGroup { } public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - this(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { - if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) { - return false; - } - e.clear(); - return true; - })); + this(clientMode, threadNameFormat, threads, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity)); } @SuppressWarnings("OverridableMethodCallInConstructor") - public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { - this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, safeBufferPool); + public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { + this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, safeBufferPool); } @SuppressWarnings("OverridableMethodCallInConstructor") - public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { - this.bufferCapacity = bufferCapacity; + public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { + this.bufferCapacity = safeBufferPool.getBufferCapacity(); this.ioReadThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads]; final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); @@ -115,15 +107,15 @@ public class AsyncIOGroup extends AsyncGroup { } } - protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool); } - protected AsyncIOThread createClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + protected AsyncIOThread createClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { return new ClientReadIOThread(g, name, index, threads, workExecutor, safeBufferPool); } - protected AsyncIOThread createClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + protected AsyncIOThread createClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { return new ClientWriteIOThread(g, name, index, threads, workExecutor, safeBufferPool); } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 0c7b19ce7..2ca6efc78 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -41,11 +41,11 @@ public class AsyncIOThread extends WorkThread { private final AtomicBoolean closed = new AtomicBoolean(); - public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { super(g, name, index, threads, workExecutor, null); this.selector = Selector.open(); this.setDaemon(true); - ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(this, 512, safeBufferPool); + ByteBufferPool unsafeBufferPool = ByteBufferPool.createUnsafePool(this, 512, safeBufferPool); this.bufferSupplier = unsafeBufferPool; this.bufferConsumer = unsafeBufferPool; } diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index cd8a2d2b2..e1b8f5519 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -7,7 +7,6 @@ package org.redkale.net; import java.io.IOException; import java.net.*; -import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; import java.util.concurrent.atomic.LongAdder; @@ -92,7 +91,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { LongAdder createResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder(); - ObjectPool safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); + ByteBufferPool safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); ObjectPool safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); final int respPoolMax = server.getResponsePoolSize(); ThreadLocal> localResponsePool = ThreadLocal.withInitial(() -> { @@ -119,7 +118,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { (pool == null ? safeResponsePool : pool).accept(v); }; final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); - this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool); + this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, safeBufferPool); this.ioGroup.start(); this.acceptThread = new Thread() { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 5c0ee71e2..0844bf702 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -92,7 +92,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { LongAdder createResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder(); - ObjectPool safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); + ByteBufferPool safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); ObjectPool safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); ThreadLocal> localResponsePool = ThreadLocal.withInitial(() -> { if (!(Thread.currentThread() instanceof WorkThread)) { @@ -110,7 +110,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { (pool == null ? safeResponsePool : pool).accept(v); }; final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); - this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool); + this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, safeBufferPool); this.ioGroup.start(); udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ); this.acceptThread = new Thread() { @@ -120,7 +120,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { @Override public void run() { - udpServerChannel.unsafeBufferPool = ObjectPool.createUnsafePool(Thread.currentThread(), 512, safeBufferPool); + udpServerChannel.unsafeBufferPool = ByteBufferPool.createUnsafePool(Thread.currentThread(), 512, safeBufferPool); final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; final int reads = ioReadThreads.length; @@ -130,7 +130,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { Set keys = null; final Selector sel = selector; final DatagramChannel serverChannel = udpServerChannel.serverChannel; - final ObjectPool unsafeBufferPool = udpServerChannel.unsafeBufferPool; + final ByteBufferPool unsafeBufferPool = udpServerChannel.unsafeBufferPool; while (!closed) { try { int count = sel.select(); @@ -238,7 +238,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { DatagramChannel serverChannel; - ObjectPool unsafeBufferPool; + ByteBufferPool unsafeBufferPool; ConcurrentHashMap connections = new ConcurrentHashMap<>(); diff --git a/src/main/java/org/redkale/net/Server.java b/src/main/java/org/redkale/net/Server.java index 3b956b775..b5cc6177c 100644 --- a/src/main/java/org/redkale/net/Server.java +++ b/src/main/java/org/redkale/net/Server.java @@ -7,13 +7,13 @@ package org.redkale.net; import java.io.*; import java.net.*; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.atomic.LongAdder; import java.util.logging.*; import javax.net.ssl.SSLContext; import org.redkale.boot.Application; +import static org.redkale.net.AsyncGroup.UDP_BUFFER_CAPACITY; import org.redkale.net.Filter; import org.redkale.util.*; @@ -128,7 +128,7 @@ public abstract class Server createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize); + protected abstract ByteBufferPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize); //必须在 createContext()之后调用 protected abstract ObjectPool

createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize); diff --git a/src/main/java/org/redkale/net/client/ClientReadIOThread.java b/src/main/java/org/redkale/net/client/ClientReadIOThread.java index 20624c9bf..8ffec3056 100644 --- a/src/main/java/org/redkale/net/client/ClientReadIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientReadIOThread.java @@ -4,10 +4,9 @@ package org.redkale.net.client; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import org.redkale.net.AsyncIOThread; -import org.redkale.util.ObjectPool; +import org.redkale.util.ByteBufferPool; /** * 客户端IO读线程 @@ -22,7 +21,7 @@ import org.redkale.util.ObjectPool; public class ClientReadIOThread extends AsyncIOThread { public ClientReadIOThread(ThreadGroup g, String name, int index, int threads, - ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { super(g, name, index, threads, workExecutor, safeBufferPool); } diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index 1327e89c5..b6e07bbd5 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -26,7 +26,7 @@ public class ClientWriteIOThread extends AsyncIOThread { private final BlockingQueue requestQueue = new LinkedBlockingQueue<>(); public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, - ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { super(g, name, index, threads, workExecutor, safeBufferPool); } diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index 83e8bb204..6900a89a5 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -7,7 +7,6 @@ package org.redkale.net.http; import java.lang.reflect.Field; import java.net.HttpCookie; -import java.nio.ByteBuffer; import java.text.*; import java.time.ZoneId; import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME; @@ -42,7 +41,7 @@ public class HttpServer extends Server safeBufferPool; + private ByteBufferPool safeBufferPool; private final ReentrantLock groupLock = new ReentrantLock(); @@ -550,7 +549,7 @@ public class HttpServer extends Server createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { - final int rcapacity = this.bufferCapacity; - ObjectPool bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { - if (e == null || e.isReadOnly() || e.capacity() != rcapacity) { - return false; - } - e.clear(); - return true; - }); - this.safeBufferPool = bufferPool; - return bufferPool; + protected ByteBufferPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { + this.safeBufferPool = ByteBufferPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, this.bufferCapacity); + return this.safeBufferPool; } @Override diff --git a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java index 7172968eb..a42a14e4c 100644 --- a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java +++ b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java @@ -4,10 +4,9 @@ package org.redkale.net.http; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import org.redkale.net.*; -import org.redkale.util.ObjectPool; +import org.redkale.util.*; /** * WebSocket只写版的AsyncIOGroup
@@ -23,12 +22,12 @@ import org.redkale.util.ObjectPool; */ class WebSocketAsyncGroup extends AsyncIOGroup { - public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, int bufferCapacity, ObjectPool safeBufferPool) { - super(false, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); + public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { + super(false, threadNameFormat, workExecutor, safeBufferPool); } @Override - protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { return new WebSocketWriteIOThread(this.timeoutExecutor, g, name, index, threads, workExecutor, safeBufferPool); } diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java index 6218b9644..d1769a394 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java @@ -28,7 +28,7 @@ public class WebSocketWriteIOThread extends AsyncIOThread { private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads, - ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { super(g, name, index, threads, workExecutor, safeBufferPool); Objects.requireNonNull(timeoutExecutor); this.timeoutExecutor = timeoutExecutor; diff --git a/src/main/java/org/redkale/net/sncp/SncpServer.java b/src/main/java/org/redkale/net/sncp/SncpServer.java index 79ba30ed7..5f83e3dc5 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServer.java +++ b/src/main/java/org/redkale/net/sncp/SncpServer.java @@ -5,7 +5,6 @@ */ package org.redkale.net.sncp; -import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.atomic.*; import org.redkale.boot.Application; @@ -128,17 +127,8 @@ public class SncpServer extends Server createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { - final int rcapacity = this.bufferCapacity; - ObjectPool bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { - if (e == null || e.isReadOnly() || e.capacity() != rcapacity) { - return false; - } - e.clear(); - return true; - }); - return bufferPool; + protected ByteBufferPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { + return ByteBufferPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, this.bufferCapacity); } @Override diff --git a/src/main/java/org/redkale/util/ByteBufferPool.java b/src/main/java/org/redkale/util/ByteBufferPool.java new file mode 100644 index 000000000..c05b4efd1 --- /dev/null +++ b/src/main/java/org/redkale/util/ByteBufferPool.java @@ -0,0 +1,88 @@ +/* + * + */ +package org.redkale.util; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.LongAdder; + +/** + * ByteBuffer的对象池
+ *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + */ +public class ByteBufferPool extends ObjectPool { + + private final int bufferCapacity; + + protected ByteBufferPool(ObjectPool parent, LongAdder creatCounter, LongAdder cycleCounter, Thread unsafeThread, int max, int bufferCapacity, Queue queue) { + super(parent, creatCounter, cycleCounter, unsafeThread, max, (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) { + return false; + } + e.clear(); + return true; + }, queue); + this.bufferCapacity = bufferCapacity; + } + + //非线程安全版 + public static ByteBufferPool createUnsafePool(int max, int bufferCapacity) { + return createUnsafePool(null, null, max, bufferCapacity); + } + + //非线程安全版 + public static ByteBufferPool createUnsafePool(LongAdder creatCounter, LongAdder cycleCounter, int max, int bufferCapacity) { + return createUnsafePool(null, creatCounter, cycleCounter, max, bufferCapacity); + } + + //非线程安全版 + public static ByteBufferPool createUnsafePool(ByteBufferPool parent, int bufferCapacity) { + return createUnsafePool(parent, 2, bufferCapacity); + } + + //非线程安全版 + public static ByteBufferPool createUnsafePool(ByteBufferPool parent, int max, int bufferCapacity) { + return createUnsafePool(parent, null, null, max, bufferCapacity); + } + + //非线程安全版 + public static ByteBufferPool createUnsafePool(ByteBufferPool parent, LongAdder creatCounter, LongAdder cycleCounter, int max, int bufferCapacity) { + return new ByteBufferPool(parent, creatCounter, cycleCounter, null, Math.max(Utility.cpus(), max), bufferCapacity, new ArrayDeque<>(Math.max(Utility.cpus(), max))); + } + + //非线程安全版 + public static ByteBufferPool createUnsafePool(Thread unsafeThread, int max, ByteBufferPool safePool) { + return createUnsafePool(safePool, safePool.getCreatCounter(), safePool.getCycleCounter(), unsafeThread, max, safePool.getBufferCapacity()); + } + + //非线程安全版 + public static ByteBufferPool createUnsafePool(ByteBufferPool parent, LongAdder creatCounter, LongAdder cycleCounter, Thread unsafeThread, int max, int bufferCapacity) { + return new ByteBufferPool(parent, creatCounter, cycleCounter, unsafeThread, Math.max(Utility.cpus(), max), bufferCapacity, new ArrayDeque<>(Math.max(Utility.cpus(), max))); + } + + //线程安全版 + public static ByteBufferPool createSafePool(int bufferCapacity) { + return createSafePool(2, bufferCapacity); + } + + //线程安全版 + public static ByteBufferPool createSafePool(int max, int bufferCapacity) { + return createSafePool(null, null, max, bufferCapacity); + } + + //线程安全版 + public static ByteBufferPool createSafePool(LongAdder creatCounter, LongAdder cycleCounter, int max, int bufferCapacity) { + return new ByteBufferPool(null, creatCounter, cycleCounter, null, Math.max(Utility.cpus(), max), bufferCapacity, new LinkedBlockingQueue<>(Math.max(Utility.cpus(), max))); + } + + public int getBufferCapacity() { + return bufferCapacity; + } + +} diff --git a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java index f06420dec..a9dbe31f7 100644 --- a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java @@ -51,18 +51,20 @@ public class SncpClientCodecTest { ByteArray writeArray = new ByteArray(); request.prepare(header, 1, "", new byte[20]); System.out.println("request.1 = " + request); + writeArray.put(new byte[SncpHeader.HEADER_SIZE]); request.writeTo(conn, writeArray); request.prepare(header, 2, "", new byte[25]); System.out.println("request.2 = " + request); + writeArray.put(new byte[SncpHeader.HEADER_SIZE]); request.writeTo(conn, writeArray); System.out.println(writeArray.getBytes().length); realBuf = ByteBuffer.wrap(writeArray.getBytes()); } System.out.println("sncp.realBuf = " + realBuf.remaining()); codec.decodeMessages(realBuf, new ByteArray()); + System.out.println("respResults.size = " + respResults.size()); if (!main) { Assertions.assertEquals(2, respResults.size()); } - System.out.println(respResults); } } diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index e1361aacf..ee98a6d3d 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -32,7 +32,7 @@ public class SncpTest { private static final String protocol = "SNCP.UDP"; - private static final int clientCapacity = protocol.endsWith(".UDP") ? 1350 : 8192; + private static final int clientCapacity = protocol.endsWith(".UDP") ? AsyncGroup.UDP_BUFFER_CAPACITY : 8192; private static final ResourceFactory factory = ResourceFactory.create(); @@ -94,7 +94,8 @@ public class SncpTest { callbean = service.insert(callbean); System.out.println("bean: " + callbean); System.out.println("---------------------------------------------------"); - final int count = 10; + Thread.sleep(200); + final int count = 1; final CountDownLatch cld = new CountDownLatch(count); final AtomicInteger ai = new AtomicInteger(); long s = System.currentTimeMillis(); @@ -110,7 +111,7 @@ public class SncpTest { bean.setContent("数据: " + (k < 10 ? "0" : "") + k); StringBuilder sb = new StringBuilder(); sb.append(k).append("------"); - for (int i = 0; i < 120; i++) { + for (int i = 0; i < 900; i++) { sb.append("_").append(i).append("_").append(k).append("_0123456789"); } bean.setContent(sb.toString()); diff --git a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java index 0668b0675..8837f40d2 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -65,7 +65,7 @@ public class SncpTestServiceImpl implements SncpTestIService { @Override public String queryResult(SncpTestBean bean) { System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法"); - return "result: " + bean.getId(); + return "result: " + bean.getContent(); } public void queryResult(CompletionHandler handler, @RpcAttachment SncpTestBean bean) {