diff --git a/src/main/java/META-INF/application-template.xml b/src/main/java/META-INF/application-template.xml index 5033620ee..966b811f9 100644 --- a/src/main/java/META-INF/application-template.xml +++ b/src/main/java/META-INF/application-template.xml @@ -158,7 +158,7 @@ threads【已废弃】: 线程数, 默认: CPU核数*2,最小8个【已废弃 @since 2.3.0】 maxconns: 最大连接数, 小于1表示无限制, 默认: 0 maxbody: request.body最大值, 默认: 64K - bufferCapacity: ByteBuffer的初始化大小, TCP默认: 32K; (HTTP 2.0、WebSocket,必须要16k以上); UDP默认: 1350B + bufferCapacity: ByteBuffer的初始化大小, TCP默认: 32K; (HTTP 2.0、WebSocket,必须要16k以上); UDP默认: 8K bufferPoolSize: ByteBuffer池的大小,默认: 线程数*4 responsePoolSize: Response池的大小,默认: 1024 aliveTimeoutSeconds: KeepAlive读操作超时秒数, 默认30, 0表示永久不超时; -1表示禁止KeepAlive diff --git a/src/main/java/org/redkale/convert/bson/BsonWriter.java b/src/main/java/org/redkale/convert/bson/BsonWriter.java index b834b5375..d393fdba3 100644 --- a/src/main/java/org/redkale/convert/bson/BsonWriter.java +++ b/src/main/java/org/redkale/convert/bson/BsonWriter.java @@ -139,6 +139,12 @@ public class BsonWriter extends Writer implements ByteTuple { content[count++] = ch; } + //类似writeTo(new byte[length]) + public void writePlaceholderTo(final int length) { + expand(length); + count += length; + } + public final void writeTo(final byte... chs) { writeTo(chs, 0, chs.length); } diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index 48b412be4..cb5f7b636 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -10,7 +10,9 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedDeque; import javax.net.ssl.SSLContext; +import org.redkale.net.AsyncNioUdpProtocolServer.AsyncNioUdpServerChannel; /** * @@ -23,6 +25,10 @@ class AsyncNioUdpConnection extends AsyncNioConnection { private final DatagramChannel channel; + private final ConcurrentLinkedDeque revbufferQueue = new ConcurrentLinkedDeque<>(); + + AsyncNioUdpServerChannel udpServerChannel; + public AsyncNioUdpConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) { super(clientMode, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext); @@ -117,10 +123,21 @@ class AsyncNioUdpConnection extends AsyncNioConnection { if (clientMode) { return this.channel.read(dst); } else { - return 0; + ByteBuffer buf = revbufferQueue.poll(); + if (buf == null) { + return 0; + } + int start = dst.position(); + dst.put(buf); + return dst.position() - start; } } + void receiveBuffer(ByteBuffer buf) { + revbufferQueue.offer(buf.flip()); + doRead(true); + } + @Override protected int implWrite(ByteBuffer src) throws IOException { return this.channel.send(src, remoteAddress); diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index f236fc7e2..c9fbadf17 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -10,6 +10,7 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; import java.util.function.*; import java.util.logging.Level; @@ -26,7 +27,7 @@ import org.redkale.util.*; */ class AsyncNioUdpProtocolServer extends ProtocolServer { - private DatagramChannel serverChannel; + private AsyncNioUdpServerChannel udpServerChannel; private Selector selector; @@ -46,40 +47,41 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { @Override public void open(AnyValue config) throws IOException { - this.serverChannel = DatagramChannel.open(); - this.serverChannel.configureBlocking(false); + DatagramChannel serverChannel = DatagramChannel.open(); + this.udpServerChannel = new AsyncNioUdpServerChannel(serverChannel); + serverChannel.configureBlocking(false); this.selector = Selector.open(); - final Set> options = this.serverChannel.supportedOptions(); + final Set> options = serverChannel.supportedOptions(); if (options.contains(StandardSocketOptions.TCP_NODELAY)) { - this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); } if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { - this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); } if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { - this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); } if (options.contains(StandardSocketOptions.SO_RCVBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024); + serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024); } if (options.contains(StandardSocketOptions.SO_SNDBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024); + serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024); } } @Override public void bind(SocketAddress local, int backlog) throws IOException { - this.serverChannel.bind(local); + udpServerChannel.serverChannel.bind(local); } @Override public void setOption(SocketOption name, T value) throws IOException { - this.serverChannel.setOption(name, value); + udpServerChannel.serverChannel.setOption(name, value); } @Override public Set> supportedOptions() { - return this.serverChannel.supportedOptions(); + return udpServerChannel.serverChannel.supportedOptions(); } @Override @@ -110,7 +112,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool); this.ioGroup.start(); - this.serverChannel.register(this.selector, SelectionKey.OP_READ); + udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ); this.acceptThread = new Thread() { { setName(String.format(threadNameFormat, "Accept")); @@ -126,6 +128,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { int writeIndex = -1; Set keys = null; final Selector sel = selector; + final DatagramChannel serverChannel = udpServerChannel.serverChannel; ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(null, 512, safeBufferPool); while (!closed) { try { @@ -141,14 +144,19 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { final ByteBuffer buffer = unsafeBufferPool.get(); try { SocketAddress address = serverChannel.receive(buffer); - buffer.flip(); + serverChannel.register(sel, SelectionKey.OP_READ); if (++readIndex >= reads) { readIndex = 0; } if (++writeIndex >= writes) { writeIndex = 0; } - accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); + AsyncNioUdpConnection conn = udpServerChannel.connections.get(address); + if (conn == null) { + accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); + } else { + conn.receiveBuffer(buffer); + } } catch (Throwable t) { unsafeBufferPool.accept(buffer); } @@ -169,9 +177,12 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { private void accept(SocketAddress address, ByteBuffer buffer, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException { ioGroup.connCreateCounter.increment(); ioGroup.connLivingCounter.increment(); - AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioReadThread, ioWriteThread, this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address); + AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioReadThread, ioWriteThread, udpServerChannel.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address); + conn.udpServerChannel = udpServerChannel; + udpServerChannel.connections.put(address, conn); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); conn.protocolCodec = codec; + buffer.flip(); if (conn.sslEngine == null) { codec.start(buffer); } else { @@ -189,7 +200,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { @Override public SocketAddress getLocalAddress() throws IOException { - return this.serverChannel.getLocalAddress(); + return udpServerChannel.serverChannel.getLocalAddress(); } @Override @@ -199,7 +210,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { } this.closed = true; this.ioGroup.close(); - this.serverChannel.close(); + udpServerChannel.serverChannel.close(); } @Override @@ -221,4 +232,16 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { public long getLivingConnectionCount() { return -1; } + + static class AsyncNioUdpServerChannel { + + DatagramChannel serverChannel; + + ConcurrentHashMap connections = new ConcurrentHashMap<>(); + + public AsyncNioUdpServerChannel(DatagramChannel serverChannel) { + this.serverChannel = serverChannel; + } + + } } diff --git a/src/main/java/org/redkale/net/Server.java b/src/main/java/org/redkale/net/Server.java index b03f3bf55..3b956b775 100644 --- a/src/main/java/org/redkale/net/Server.java +++ b/src/main/java/org/redkale/net/Server.java @@ -128,7 +128,7 @@ public abstract class Server { - public static final byte[] DEFAULT_HEADER = new byte[HEADER_SIZE]; - protected static final int READ_STATE_ROUTE = 1; protected static final int READ_STATE_HEADER = 2; @@ -32,8 +30,6 @@ public class SncpRequest extends Request { protected static final int READ_STATE_END = 4; - protected final BsonConvert convert; - protected int readState = READ_STATE_ROUTE; private SncpHeader header; @@ -46,7 +42,6 @@ public class SncpRequest extends Request { protected SncpRequest(SncpContext context) { super(context); - this.convert = context.getBsonConvert(); } @Override //request.header与response.header数据格式保持一致 @@ -79,12 +74,14 @@ public class SncpRequest extends Request { } return 0; } - int len = Math.min(bodyLength, buffer.remaining()); - buffer.get(body, 0, len); - this.bodyOffset = len; - int rs = bodyLength - len; + int len = Math.min(bodyLength - this.bodyOffset, buffer.remaining()); + buffer.get(body, this.bodyOffset, len); + this.bodyOffset += len; + int rs = bodyLength - this.bodyOffset; if (rs == 0) { this.readState = READ_STATE_END; + } else { + buffer.clear(); } return rs; } @@ -107,7 +104,7 @@ public class SncpRequest extends Request { @Override public String toString() { - return SncpRequest.class.getSimpleName() + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}"; + return SncpRequest.class.getSimpleName() + "_" + Objects.hashCode(this) + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}"; } @Override diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index b55e0fd01..129f092c7 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -92,6 +92,7 @@ public class SncpResponse extends Response { finish(RETCODE_THROWEXCEPTION, null); } + //调用此方法时out已写入SncpHeader public void finish(final int retcode, final BsonWriter out) { if (out == null) { final ByteArray buffer = new ByteArray(HEADER_SIZE); @@ -99,9 +100,8 @@ public class SncpResponse extends Response { finish(buffer); return; } - final int respBodyLength = out.count(); //body总长度 final ByteArray array = out.toByteArray(); - fillHeader(array, respBodyLength - HEADER_SIZE, retcode); + fillHeader(array, array.length() - HEADER_SIZE, retcode); finish(array); } diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index 97ce7477e..e1361aacf 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -7,11 +7,10 @@ package org.redkale.test.sncp; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicInteger; import org.redkale.boot.*; import org.redkale.convert.bson.*; import org.redkale.net.*; @@ -33,6 +32,8 @@ public class SncpTest { private static final String protocol = "SNCP.UDP"; + private static final int clientCapacity = protocol.endsWith(".UDP") ? 1350 : 8192; + private static final ResourceFactory factory = ResourceFactory.create(); public static void main(String[] args) throws Exception { @@ -64,17 +65,6 @@ public class SncpTest { return AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1); } - public static ObjectPool newBufferPool() { - return ObjectPool.createSafePool(new LongAdder(), new LongAdder(), 16, - (Object... params) -> ByteBuffer.allocateDirect(8192), null, (e) -> { - if (e == null || e.isReadOnly() || e.capacity() != 8192) { - return false; - } - e.clear(); - return true; - }); - } - private static void runClient() throws Exception { InetSocketAddress addr = new InetSocketAddress(myhost, port); Set set = new LinkedHashSet<>(); @@ -82,7 +72,7 @@ public class SncpTest { if (port2 > 0) { set.add(new InetSocketAddress(myhost, port2)); } - final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + final AsyncIOGroup asyncGroup = new AsyncIOGroup(clientCapacity, 16); asyncGroup.start(); final TransportFactory transFactory = TransportFactory.create(asyncGroup, protocol.endsWith(".UDP") ? "UDP" : "TCP", 0, 0); transFactory.addGroupInfo("client", set); @@ -120,7 +110,7 @@ public class SncpTest { bean.setContent("数据: " + (k < 10 ? "0" : "") + k); StringBuilder sb = new StringBuilder(); sb.append(k).append("------"); - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 120; i++) { sb.append("_").append(i).append("_").append(k).append("_0123456789"); } bean.setContent(sb.toString()); @@ -139,6 +129,10 @@ public class SncpTest { } cld.await(); System.out.println("---并发" + count + "次耗时: " + (System.currentTimeMillis() - s) / 1000.0 + "s"); + if (count == 1) { + System.exit(0); + return; + } final CountDownLatch cld2 = new CountDownLatch(1); long s2 = System.currentTimeMillis(); final CompletableFuture future = service.queryResultAsync(callbean); @@ -194,7 +188,7 @@ public class SncpTest { private static void runServer2() throws Exception { InetSocketAddress addr = new InetSocketAddress(myhost, port2); final CountDownLatch cdl = new CountDownLatch(1); - final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8196, 16); asyncGroup.start(); new Thread() { { diff --git a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java index d3b3f3163..0668b0675 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -57,16 +57,21 @@ public class SncpTestServiceImpl implements SncpTestIService { return bean; } + public SncpTestBean expand(SncpTestBean bean) { + bean.setId(System.currentTimeMillis()); + return bean; + } + @Override public String queryResult(SncpTestBean bean) { System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法"); - return "result: " + bean; + return "result: " + bean.getId(); } public void queryResult(CompletionHandler handler, @RpcAttachment SncpTestBean bean) { System.out.println(Thread.currentThread().getName() + " handler 运行了queryResult方法"); if (handler != null) { - handler.completed("result: " + bean, bean); + handler.completed("result: " + bean.getId(), bean); } }