diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index b91d7ef00..f83d404fa 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -25,7 +25,7 @@ class AsyncNioUdpConnection extends AsyncNioConnection { private final DatagramChannel channel; - private final ConcurrentLinkedDeque revbufferQueue = new ConcurrentLinkedDeque<>(); + private final ConcurrentLinkedDeque revDataQueue = new ConcurrentLinkedDeque<>(); AsyncNioUdpServerChannel udpServerChannel; @@ -123,24 +123,26 @@ class AsyncNioUdpConnection extends AsyncNioConnection { if (clientMode) { return this.channel.read(dst); } else { - ByteBuffer buf = revbufferQueue.poll(); - if (buf == null) { - return 0; - } int start = dst.position(); - dst.put(buf); - if (buf.hasRemaining()) { - revbufferQueue.offerFirst(buf); - } else { - udpServerChannel.unsafeBufferPool.accept(buf); + while (dst.hasRemaining()) { + ByteBuffer buf = revDataQueue.poll(); + if (buf == null) { + break; + } + dst.put(buf); + if (buf.hasRemaining()) { + revDataQueue.offerFirst(buf); + } else { + udpServerChannel.unsafeBufferPool.accept(buf); + } } return dst.position() - start; } } - void receiveBuffer(ByteBuffer buf) { + void receiveData(ByteBuffer buf) { this.ioReadThread.execute(() -> { - revbufferQueue.offer(buf.flip()); + revDataQueue.offer(buf.flip()); doRead(this.ioReadThread.inCurrThread()); }); } diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 6858a5861..5c0ee71e2 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -156,7 +156,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { if (conn == null) { accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); } else { - conn.receiveBuffer(buffer); + conn.receiveData(buffer); } } catch (Throwable t) { unsafeBufferPool.accept(buffer);