udp优化

This commit is contained in:
redkale
2023-02-07 15:02:19 +08:00
parent 75469a49e8
commit bab857778b
2 changed files with 15 additions and 13 deletions

View File

@@ -25,7 +25,7 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
private final DatagramChannel channel; private final DatagramChannel channel;
private final ConcurrentLinkedDeque<ByteBuffer> revbufferQueue = new ConcurrentLinkedDeque<>(); private final ConcurrentLinkedDeque<ByteBuffer> revDataQueue = new ConcurrentLinkedDeque<>();
AsyncNioUdpServerChannel udpServerChannel; AsyncNioUdpServerChannel udpServerChannel;
@@ -123,24 +123,26 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
if (clientMode) { if (clientMode) {
return this.channel.read(dst); return this.channel.read(dst);
} else { } else {
ByteBuffer buf = revbufferQueue.poll();
if (buf == null) {
return 0;
}
int start = dst.position(); int start = dst.position();
dst.put(buf); while (dst.hasRemaining()) {
if (buf.hasRemaining()) { ByteBuffer buf = revDataQueue.poll();
revbufferQueue.offerFirst(buf); if (buf == null) {
} else { break;
udpServerChannel.unsafeBufferPool.accept(buf); }
dst.put(buf);
if (buf.hasRemaining()) {
revDataQueue.offerFirst(buf);
} else {
udpServerChannel.unsafeBufferPool.accept(buf);
}
} }
return dst.position() - start; return dst.position() - start;
} }
} }
void receiveBuffer(ByteBuffer buf) { void receiveData(ByteBuffer buf) {
this.ioReadThread.execute(() -> { this.ioReadThread.execute(() -> {
revbufferQueue.offer(buf.flip()); revDataQueue.offer(buf.flip());
doRead(this.ioReadThread.inCurrThread()); doRead(this.ioReadThread.inCurrThread());
}); });
} }

View File

@@ -156,7 +156,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
if (conn == null) { if (conn == null) {
accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
} else { } else {
conn.receiveBuffer(buffer); conn.receiveData(buffer);
} }
} catch (Throwable t) { } catch (Throwable t) {
unsafeBufferPool.accept(buffer); unsafeBufferPool.accept(buffer);