From 75469a49e8ebc2a3f14068335f0ea2b1d1b00ec2 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 7 Feb 2023 14:55:58 +0800 Subject: [PATCH] =?UTF-8?q?udp=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/AsyncNioUdpConnection.java | 9 ++++++++- .../java/org/redkale/net/AsyncNioUdpProtocolServer.java | 5 ++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index 85fcd654d..b91d7ef00 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -129,6 +129,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection { } int start = dst.position(); dst.put(buf); + if (buf.hasRemaining()) { + revbufferQueue.offerFirst(buf); + } else { + udpServerChannel.unsafeBufferPool.accept(buf); + } return dst.position() - start; } } @@ -178,7 +183,9 @@ class AsyncNioUdpConnection extends AsyncNioConnection { public final void close() throws IOException { super.close(); if (clientMode) { - channel.close(); //不能关闭channel + channel.close(); + } else if (remoteAddress != null) { + udpServerChannel.connections.remove(remoteAddress); } if (this.connectKey != null) { this.connectKey.cancel(); diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index c9fbadf17..6858a5861 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -120,6 +120,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { @Override public void run() { + udpServerChannel.unsafeBufferPool = ObjectPool.createUnsafePool(Thread.currentThread(), 512, safeBufferPool); final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; final int reads = ioReadThreads.length; @@ -129,7 +130,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { Set keys = null; final Selector sel = selector; final DatagramChannel serverChannel = udpServerChannel.serverChannel; - ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(null, 512, safeBufferPool); + final ObjectPool unsafeBufferPool = udpServerChannel.unsafeBufferPool; while (!closed) { try { int count = sel.select(); @@ -237,6 +238,8 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { DatagramChannel serverChannel; + ObjectPool unsafeBufferPool; + ConcurrentHashMap connections = new ConcurrentHashMap<>(); public AsyncNioUdpServerChannel(DatagramChannel serverChannel) {