From 44500b650098171a3bc3be74571bc4a48fb0ec8a Mon Sep 17 00:00:00 2001 From: Redkale Date: Tue, 3 Jan 2023 14:46:28 +0800 Subject: [PATCH] =?UTF-8?q?AsyncConnection=E5=8C=BA=E5=88=86read/write?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 134 ++++++++++++------ .../java/org/redkale/net/AsyncIOThread.java | 1 + .../net/AsyncNioCompletionHandler.java | 41 +++++- .../org/redkale/net/AsyncNioConnection.java | 26 ++-- .../redkale/net/AsyncNioTcpConnection.java | 29 +--- .../net/AsyncNioTcpProtocolServer.java | 2 +- .../redkale/net/AsyncNioUdpConnection.java | 18 --- .../java/org/redkale/net/ProtocolCodec.java | 10 +- src/main/java/org/redkale/net/Response.java | 16 +-- src/main/java/org/redkale/net/Transport.java | 6 +- .../org/redkale/net/TransportFactory.java | 6 +- .../redkale/net/client/ClientConnection.java | 10 +- .../org/redkale/net/client/ClientFuture.java | 6 +- .../org/redkale/net/http/HttpResponse.java | 4 +- .../redkale/net/http/HttpSimpleClient.java | 28 ++-- .../java/org/redkale/net/http/WebSocket.java | 110 ++++++++++---- .../java/org/redkale/net/sncp/SncpClient.java | 8 +- 17 files changed, 285 insertions(+), 170 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index c28e1fa8c..a4f34a662 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -44,7 +44,9 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl protected final AsyncGroup ioGroup; - protected final AsyncIOThread ioThread; + protected final AsyncIOThread ioReadThread; + + protected final AsyncIOThread ioWriteThread; protected final boolean client; @@ -89,7 +91,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl Objects.requireNonNull(bufferConsumer); this.client = client; this.ioGroup = ioGroup; - this.ioThread = ioThread; + this.ioReadThread = ioThread; + this.ioWriteThread = ioThread; this.bufferCapacity = bufferCapacity; this.bufferSupplier = bufferSupplier; this.bufferConsumer = bufferConsumer; @@ -110,11 +113,19 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } - public Supplier getBufferSupplier() { + public Supplier getReadBufferSupplier() { return this.bufferSupplier; } - public Consumer getBufferConsumer() { + public Consumer getReadBufferConsumer() { + return this.bufferConsumer; + } + + public Supplier getWriteBufferSupplier() { + return this.bufferSupplier; + } + + public Consumer getWriteBufferConsumer() { return this.bufferConsumer; } @@ -138,24 +149,44 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl return eventing.decrementAndGet(); } - public final void execute(Runnable command) { - ioThread.execute(command); + public final void executeRead(Runnable command) { + ioReadThread.execute(command); } - public final void execute(Runnable... commands) { - ioThread.execute(commands); + public final void executeRead(Runnable... commands) { + ioReadThread.execute(commands); } - public final void execute(Collection commands) { - ioThread.execute(commands); + public final void executeRead(Collection commands) { + ioReadThread.execute(commands); } - public final boolean inCurrThread() { - return ioThread.inCurrThread(); + public final void executeWrite(Runnable command) { + ioWriteThread.execute(command); } - public final AsyncIOThread getAsyncIOThread() { - return ioThread; + public final void executeWrite(Runnable... commands) { + ioWriteThread.execute(commands); + } + + public final void executeWrite(Collection commands) { + ioWriteThread.execute(commands); + } + + public final boolean inCurrReadThread() { + return ioReadThread.inCurrThread(); + } + + public final boolean inCurrWriteThread() { + return ioWriteThread.inCurrThread(); + } + + public final AsyncIOThread getReadIOThread() { + return ioReadThread; + } + + public final AsyncIOThread getWriteIOThread() { + return ioWriteThread; } @Override @@ -196,10 +227,10 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } public final void startReadInIOThread(CompletionHandler handler) { - if (inCurrThread()) { + if (inCurrReadThread()) { startRead(handler); } else { - execute(() -> startRead(handler)); + executeRead(() -> startRead(handler)); } } @@ -212,10 +243,10 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } public final void readInIOThread(CompletionHandler handler) { - if (inCurrThread()) { + if (inCurrReadThread()) { read(handler); } else { - execute(() -> read(handler)); + executeRead(() -> read(handler)); } } @@ -294,13 +325,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl CompletionHandler newhandler = new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { - offerBuffer(buffer); + offerWriteBuffer(buffer); handler.completed(result, attachment); } @Override public void failed(Throwable exc, Void attachment) { - offerBuffer(buffer); + offerWriteBuffer(buffer); handler.failed(exc, attachment); } }; @@ -318,13 +349,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl CompletionHandler newhandler = new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { - offerBuffer(buffers); + offerWriteBuffer(buffers); handler.completed(result, attachment); } @Override public void failed(Throwable exc, Void attachment) { - offerBuffer(buffers); + offerWriteBuffer(buffers); handler.failed(exc, attachment); } }; @@ -358,13 +389,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl CompletionHandler newhandler = new CompletionHandler() { @Override public void completed(Integer result, A attachment) { - offerBuffer(srcs); + offerWriteBuffer(srcs); handler.completed(result, attachment); } @Override public void failed(Throwable exc, A attachment) { - offerBuffer(srcs); + offerWriteBuffer(srcs); handler.failed(exc, attachment); } }; @@ -386,7 +417,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl synchronized (this) { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { - writer = ByteBufferWriter.create(getBufferSupplier()); + writer = ByteBufferWriter.create(getWriteBufferSupplier()); this.pipelineWriter = writer; } if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) { @@ -424,7 +455,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl synchronized (this) { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { - writer = ByteBufferWriter.create(getBufferSupplier()); + writer = ByteBufferWriter.create(getWriteBufferSupplier()); this.pipelineWriter = writer; } if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) { @@ -564,14 +595,31 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl return bufferSupplier.get(); } - public void offerBuffer(ByteBuffer buffer) { + public void offerReadBuffer(ByteBuffer buffer) { if (buffer == null) { return; } bufferConsumer.accept(buffer); } - public void offerBuffer(ByteBuffer... buffers) { + public void offerReadBuffer(ByteBuffer... buffers) { + if (buffers == null) { + return; + } + Consumer consumer = this.bufferConsumer; + for (ByteBuffer buffer : buffers) { + consumer.accept(buffer); + } + } + + public void offerWriteBuffer(ByteBuffer buffer) { + if (buffer == null) { + return; + } + bufferConsumer.accept(buffer); + } + + public void offerWriteBuffer(ByteBuffer... buffers) { if (buffers == null) { return; } @@ -712,8 +760,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl SSLEngineResult engineResult = engine.unwrap(netBuffer, appBuffer); if (engineResult.getStatus() == SSLEngineResult.Status.CLOSED && (engineResult.getHandshakeStatus() == NOT_HANDSHAKING || engineResult.getHandshakeStatus() == FINISHED)) { - offerBuffer(netBuffer); - offerBuffer(appBuffer); + offerReadBuffer(netBuffer); + offerReadBuffer(appBuffer); return null; } hss = engineResult.getHandshakeStatus(); @@ -750,7 +798,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl return; //CLOSED,netBuffer已被回收 } if (AsyncConnection.this.readSSLHalfBuffer != netBuffer) { - offerBuffer(netBuffer); + offerReadBuffer(netBuffer); } if (AsyncConnection.this.readBuffer != null) { ByteBuffer rsBuffer = AsyncConnection.this.readBuffer; @@ -758,7 +806,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl appBuffer.flip(); if (rsBuffer.remaining() >= appBuffer.remaining()) { rsBuffer.put(appBuffer); - offerBuffer(appBuffer); + offerReadBuffer(appBuffer); appBuffer = rsBuffer; } else { while (rsBuffer.hasRemaining()) rsBuffer.put(appBuffer.get()); @@ -860,13 +908,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl writeImpl(netBuffers[0], null, new CompletionHandler() { @Override public void completed(Integer count, Void attachment) { - offerBuffer(netBuffers[0]); + offerWriteBuffer(netBuffers[0]); callback.accept(null); } @Override public void failed(Throwable t, Void attachment) { - offerBuffer(netBuffers[0]); + offerWriteBuffer(netBuffers[0]); callback.accept(t); } }); @@ -874,20 +922,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl writeImpl(netBuffers, 0, netBuffers.length, null, new CompletionHandler() { @Override public void completed(Integer count, Void attachment) { - offerBuffer(netBuffers); + offerWriteBuffer(netBuffers); callback.accept(null); } @Override public void failed(Throwable t, Void attachment) { - offerBuffer(netBuffers); + offerWriteBuffer(netBuffers); callback.accept(t); } }); } return true; } else { - offerBuffer(netBuffers); + offerWriteBuffer(netBuffers); return false; } } @@ -899,13 +947,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl writeImpl(netBuffers[0], null, new CompletionHandler() { @Override public void completed(Integer count, Void attachment) { - offerBuffer(netBuffers[0]); + offerWriteBuffer(netBuffers[0]); callback.accept(null); } @Override public void failed(Throwable t, Void attachment) { - offerBuffer(netBuffers[0]); + offerWriteBuffer(netBuffers[0]); callback.accept(t); } }); @@ -913,20 +961,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl writeImpl(netBuffers, 0, netBuffers.length, null, new CompletionHandler() { @Override public void completed(Integer count, Void attachment) { - offerBuffer(netBuffers); + offerWriteBuffer(netBuffers); callback.accept(null); } @Override public void failed(Throwable t, Void attachment) { - offerBuffer(netBuffers); + offerWriteBuffer(netBuffers); callback.accept(t); } }); } return true; } else { - offerBuffer(netBuffers); + offerWriteBuffer(netBuffers); return false; } } @@ -974,7 +1022,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl if (count < 1) { callback.accept(new IOException("read data error")); } else { - offerBuffer(attachment); + offerReadBuffer(attachment); doHandshake(callback); } } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 1f893d0b8..ce3fc9083 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -30,6 +30,7 @@ public class AsyncIOThread extends WorkThread { final Selector selector; + //如果有read/write两IOThread,只记readThread final AtomicInteger connCounter = new AtomicInteger(); private final Supplier bufferSupplier; diff --git a/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java b/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java index d96cc0fea..182496f31 100644 --- a/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java +++ b/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java @@ -22,6 +22,8 @@ class AsyncNioCompletionHandler implements CompletionHandler, Run private final AsyncNioConnection conn; + private final boolean readMode; + private CompletionHandler handler; private A attachment; @@ -32,8 +34,9 @@ class AsyncNioCompletionHandler implements CompletionHandler, Run private ByteBuffer buffer; - public AsyncNioCompletionHandler(AsyncNioConnection conn) { + public AsyncNioCompletionHandler(boolean readFlag, AsyncNioConnection conn) { this.conn = conn; + this.readMode = readFlag; } public void handler(CompletionHandler handler, A attachment) { @@ -70,9 +73,17 @@ class AsyncNioCompletionHandler implements CompletionHandler, Run } if (conn != null) { if (buffers != null) { - conn.offerBuffer(buffers); + if (readMode) { + conn.offerReadBuffer(buffers); + } else { + conn.offerWriteBuffer(buffers); + } } else if (buffer != null) { - conn.offerBuffer(buffer); + if (readMode) { + conn.offerReadBuffer(buffer); + } else { + conn.offerWriteBuffer(buffer); + } } } CompletionHandler handler0 = handler; @@ -90,9 +101,17 @@ class AsyncNioCompletionHandler implements CompletionHandler, Run } if (conn != null) { if (buffers != null) { - conn.offerBuffer(buffers); + if (readMode) { + conn.offerReadBuffer(buffers); + } else { + conn.offerWriteBuffer(buffers); + } } else if (buffer != null) { - conn.offerBuffer(buffer); + if (readMode) { + conn.offerReadBuffer(buffer); + } else { + conn.offerWriteBuffer(buffer); + } } } CompletionHandler handler0 = handler; @@ -105,9 +124,17 @@ class AsyncNioCompletionHandler implements CompletionHandler, Run public void run() { if (conn != null) { if (buffers != null) { - conn.offerBuffer(buffers); + if (readMode) { + conn.offerReadBuffer(buffers); + } else { + conn.offerWriteBuffer(buffers); + } } else if (buffer != null) { - conn.offerBuffer(buffer); + if (readMode) { + conn.offerReadBuffer(buffer); + } else { + conn.offerWriteBuffer(buffer); + } } } CompletionHandler handler0 = handler; diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index e080315c5..e00641ef1 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -43,7 +43,7 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey connectKey; //-------------------------------- 读操作 -------------------------------------- - protected final AsyncNioCompletionHandler readTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(this); + protected final AsyncNioCompletionHandler readTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(true, this); protected int readTimeoutSeconds; @@ -58,7 +58,7 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey readKey; //-------------------------------- 写操作 -------------------------------------- - protected final AsyncNioCompletionHandler writeTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(this); + protected final AsyncNioCompletionHandler writeTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(false, this); protected int writeTimeoutSeconds; @@ -138,7 +138,7 @@ abstract class AsyncNioConnection extends AsyncConnection { @Override protected void startHandshake(final Consumer callback) { - ((AsyncIOThread) ioThread).register(t -> super.startHandshake(callback)); + ioReadThread.register(t -> super.startHandshake(callback)); } @Override @@ -168,9 +168,9 @@ abstract class AsyncNioConnection extends AsyncConnection { this.readCompletionHandler = handler; } if (client) { - doRead(this.ioThread.inCurrThread()); + doRead(this.ioReadThread.inCurrThread()); } else { - doRead(currReadInvoker < MAX_INVOKER_ONSTACK || this.ioThread.inCurrThread()); //同一线程中Selector.wakeup无效 + doRead(currReadInvoker < MAX_INVOKER_ONSTACK || this.ioReadThread.inCurrThread()); //同一线程中Selector.wakeup无效 } } @@ -285,7 +285,7 @@ abstract class AsyncNioConnection extends AsyncConnection { if (readCount != 0) { handleRead(readCount, null); } else if (readKey == null) { - ((AsyncIOThread) ioThread).register(selector -> { + ioReadThread.register(selector -> { try { readKey = implRegister(selector, SelectionKey.OP_READ); readKey.attach(this); @@ -294,7 +294,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } }); } else { - ((AsyncIOGroup) ioGroup).interestOpsOr((AsyncIOThread) ioThread, readKey, SelectionKey.OP_READ); + ((AsyncIOGroup) ioGroup).interestOpsOr(ioReadThread, readKey, SelectionKey.OP_READ); } } catch (Exception e) { handleRead(0, e); @@ -333,7 +333,7 @@ abstract class AsyncNioConnection extends AsyncConnection { writeByteTuple2Callback = null; writeByteTuple2Attachment = null; } else { - ByteBufferWriter writer = ByteBufferWriter.create(getBufferSupplier(), buffer); + ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length); if (writeByteTuple2Length > 0) { writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); @@ -399,7 +399,7 @@ abstract class AsyncNioConnection extends AsyncConnection { if (writeOver && (totalCount != 0 || !hasRemain)) { handleWrite(writeTotal + totalCount, null); } else if (writeKey == null) { - ((AsyncIOThread) ioThread).register(selector -> { + ioWriteThread.register(selector -> { try { writeKey = implRegister(selector, SelectionKey.OP_WRITE); writeKey.attach(this); @@ -408,7 +408,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } }); } else { - ((AsyncIOGroup) ioGroup).interestOpsOr((AsyncIOThread) ioThread, writeKey, SelectionKey.OP_WRITE); + ((AsyncIOGroup) ioGroup).interestOpsOr(ioWriteThread, writeKey, SelectionKey.OP_WRITE); } } catch (IOException e) { handleWrite(0, e); @@ -428,14 +428,14 @@ abstract class AsyncNioConnection extends AsyncConnection { this.connectPending = false;//必须放最后 if (handler != null) { - if (!client || inCurrThread()) { //client模式下必须保证read、write在ioThread内运行 + if (!client || inCurrWriteThread()) { //client模式下必须保证read、write在ioThread内运行 if (t == null) { handler.completed(null, attach); } else { handler.failed(t, attach); } } else { - ioThread.execute(() -> { + ioWriteThread.execute(() -> { if (t == null) { handler.completed(null, attach); } else { @@ -537,7 +537,7 @@ abstract class AsyncNioConnection extends AsyncConnection { @Override public void close() throws IOException { if (bb != null) { - offerBuffer(bb); + offerReadBuffer(bb); bb = null; } reader.close(); diff --git a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java index 46d6ec573..33b6b5acf 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java @@ -11,7 +11,6 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; import java.util.concurrent.atomic.LongAdder; -import java.util.function.*; import javax.net.ssl.SSLContext; import org.redkale.util.ByteBufferReader; @@ -44,21 +43,6 @@ class AsyncNioTcpConnection extends AsyncNioConnection { ioThread.connCounter.incrementAndGet(); } - public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread, Supplier bufferSupplier, Consumer bufferConsumer, - SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioThread, connectThread, ioGroup.bufferCapacity, bufferSupplier, bufferConsumer, sslBuilder, sslContext, livingCounter, closedCounter); - this.channel = ch; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = ch.getRemoteAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - } - @Override public boolean isOpen() { return this.channel.isOpen(); @@ -113,6 +97,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { } } + @Override public ReadableByteChannel readableByteChannel() { if (this.sslEngine == null) { return this.channel; @@ -131,7 +116,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { bb.put(halfBuffer.get()); } if (!halfBuffer.hasRemaining()) { - offerBuffer(halfBuffer); + offerReadBuffer(halfBuffer); halfBuffer = null; } return bb.position() - pos; @@ -150,11 +135,11 @@ class AsyncNioTcpConnection extends AsyncNioConnection { if (appBuffer.hasRemaining()) { halfBuffer = appBuffer; } else { - offerBuffer(appBuffer); + offerReadBuffer(appBuffer); } return bb.position() - pos; } else { - offerBuffer(netBuffer); + offerReadBuffer(netBuffer); return 0; } } @@ -167,7 +152,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { @Override public void close() throws IOException { if (halfBuffer != null) { - offerBuffer(halfBuffer); + offerReadBuffer(halfBuffer); halfBuffer = null; } self.close(); @@ -199,7 +184,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { channel.write(netBuffers); } } - offerBuffer(netBuffers); + offerWriteBuffer(netBuffers); return len; } @@ -285,7 +270,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { @Override public final void close() throws IOException { super.close(); - ((AsyncIOThread) ioThread).connCounter.decrementAndGet(); + ioReadThread.connCounter.decrementAndGet(); channel.shutdownInput(); channel.shutdownOutput(); channel.close(); diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index e5deec360..e2275331a 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -107,7 +107,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { return pool == null ? safeResponsePool.get() : pool.get(); }; this.responseConsumer = (v) -> { - WorkThread thread = v.channel != null ? v.channel.getAsyncIOThread() : v.thread; + WorkThread thread = v.channel != null ? v.channel.getWriteIOThread() : v.thread; if (thread != null && !thread.inCurrThread()) { thread.execute(() -> { ObjectPool pool = localResponsePool.get(); diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index 88c1f077d..ed8320749 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -11,7 +11,6 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; import java.util.concurrent.atomic.LongAdder; -import java.util.function.*; import javax.net.ssl.SSLContext; /** @@ -40,23 +39,6 @@ class AsyncNioUdpConnection extends AsyncNioConnection { this.remoteAddress = addr; } - public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread, - Supplier bufferSupplier, Consumer bufferConsumer, - DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, - LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioThread, connectThread, ioGroup.bufferCapacity, bufferSupplier, bufferConsumer, sslBuilder, sslContext, livingCounter, closedCounter); - this.channel = ch; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = ch.getRemoteAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - } - @Override public boolean isOpen() { return this.channel.isOpen(); diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index 1a3442dd3..6d1494fb7 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -56,7 +56,7 @@ class ProtocolCodec implements CompletionHandler { @Override public void completed(Integer count, ByteBuffer buffer) { if (count < 1) { - channel.offerBuffer(buffer); + channel.offerReadBuffer(buffer); channel.dispose(); // response.init(channel); 在调用之前异常 return; } @@ -78,7 +78,7 @@ class ProtocolCodec implements CompletionHandler { @Override public void failed(Throwable exc, ByteBuffer buffer) { - channel.offerBuffer(buffer); + channel.offerReadBuffer(buffer); channel.dispose();// response.init(channel); 在调用之前异常 if (exc != null && context.logger.isLoggable(Level.FINEST) && !(exc instanceof SocketException && "Connection reset".equals(exc.getMessage()))) { @@ -135,7 +135,7 @@ class ProtocolCodec implements CompletionHandler { if (rs < 0) { //表示数据格式不正确 final DispatcherServlet preparer = context.prepare; preparer.incrExecuteCounter(); - channel.offerBuffer(buffer); + channel.offerReadBuffer(buffer); if (rs != Integer.MIN_VALUE) { preparer.incrIllRequestCounter(); } @@ -178,7 +178,7 @@ class ProtocolCodec implements CompletionHandler { @Override public void completed(Integer count, ByteBuffer attachment) { if (count < 1) { - channel.offerBuffer(attachment); + channel.offerReadBuffer(attachment); channel.dispose(); return; } @@ -189,7 +189,7 @@ class ProtocolCodec implements CompletionHandler { @Override public void failed(Throwable exc, ByteBuffer attachment) { context.prepare.incrIllRequestCounter(); - channel.offerBuffer(attachment); + channel.offerReadBuffer(attachment); response.finish(true); if (exc != null) { request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc); diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index dac8ae6e2..188be57f9 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -68,7 +68,7 @@ public abstract class Response> { @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment != writeBuffer) { - channel.offerBuffer(attachment); + channel.offerWriteBuffer(attachment); } else { attachment.clear(); } @@ -78,7 +78,7 @@ public abstract class Response> { @Override public void failed(Throwable exc, ByteBuffer attachment) { if (attachment != writeBuffer) { - channel.offerBuffer(attachment); + channel.offerWriteBuffer(attachment); } else { attachment.clear(); } @@ -93,7 +93,7 @@ public abstract class Response> { public void completed(final Integer result, final ByteBuffer[] attachments) { if (attachments != null) { for (ByteBuffer attachment : attachments) { - channel.offerBuffer(attachment); + channel.offerWriteBuffer(attachment); } } finish(); @@ -103,7 +103,7 @@ public abstract class Response> { public void failed(Throwable exc, final ByteBuffer[] attachments) { if (attachments != null) { for (ByteBuffer attachment : attachments) { - channel.offerBuffer(attachment); + channel.offerWriteBuffer(attachment); } } finish(true); @@ -402,7 +402,7 @@ public abstract class Response> { @Override public void completed(Integer result, A attachment) { if (buffer != writeBuffer) { - channel.offerBuffer(buffer); + channel.offerWriteBuffer(buffer); } else { buffer.clear(); } @@ -414,7 +414,7 @@ public abstract class Response> { @Override public void failed(Throwable exc, A attachment) { if (buffer != writeBuffer) { - channel.offerBuffer(buffer); + channel.offerWriteBuffer(buffer); } else { buffer.clear(); } @@ -431,7 +431,7 @@ public abstract class Response> { @Override public void completed(Integer result, A attachment) { - channel.offerBuffer(buffers); + channel.offerWriteBuffer(buffers); if (handler != null) { handler.completed(result, attachment); } @@ -440,7 +440,7 @@ public abstract class Response> { @Override public void failed(Throwable exc, A attachment) { for (ByteBuffer buffer : buffers) { - channel.offerBuffer(buffer); + channel.offerWriteBuffer(buffer); } if (handler != null) { handler.failed(exc, attachment); diff --git a/src/main/java/org/redkale/net/Transport.java b/src/main/java/org/redkale/net/Transport.java index 1e92dbe30..b3b8fbc66 100644 --- a/src/main/java/org/redkale/net/Transport.java +++ b/src/main/java/org/redkale/net/Transport.java @@ -365,13 +365,13 @@ public final class Transport { if (handler != null) { handler.completed(result, att); } - conn.offerBuffer(attachment); + conn.offerWriteBuffer(attachment); offerConnection(false, conn); } @Override public void failed(Throwable exc, ByteBuffer attachment) { - conn.offerBuffer(attachment); + conn.offerWriteBuffer(attachment); offerConnection(true, conn); } }); @@ -380,7 +380,7 @@ public final class Transport { @Override public void failed(Throwable exc, ByteBuffer attachment) { - conn.offerBuffer(attachment); + conn.offerWriteBuffer(attachment); offerConnection(true, conn); } }); diff --git a/src/main/java/org/redkale/net/TransportFactory.java b/src/main/java/org/redkale/net/TransportFactory.java index 6485bfaa4..3939e7ed4 100644 --- a/src/main/java/org/redkale/net/TransportFactory.java +++ b/src/main/java/org/redkale/net/TransportFactory.java @@ -347,7 +347,7 @@ public class TransportFactory { @Override public void completed(Integer result, ByteBuffer pongBuffer) { if (counter > 3) { - localconn.offerBuffer(pongBuffer); + localconn.offerWriteBuffer(pongBuffer); localconn.dispose(); return; } @@ -357,13 +357,13 @@ public class TransportFactory { localconn.read(this); return; } - localconn.offerBuffer(pongBuffer); + localconn.offerWriteBuffer(pongBuffer); localqueue.offer(localconn); } @Override public void failed(Throwable exc, ByteBuffer pongBuffer) { - localconn.offerBuffer(pongBuffer); + localconn.offerWriteBuffer(pongBuffer); localconn.dispose(); } }); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index c0777cae6..2570aa505 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -63,7 +63,7 @@ public abstract class ClientConnection implements Co public void completed(Integer result, Void attachment) { if (writeLastRequest != null && writeLastRequest == client.closeRequest) { if (closeFuture != null) { - channel.getAsyncIOThread().runWork(() -> { + channel.getWriteIOThread().runWork(() -> { closeFuture.complete(null); }); } @@ -251,7 +251,7 @@ public abstract class ClientConnection implements Co // } // } if (workThread == null || workThread.getWorkExecutor() == null) { - workThread = channel.getAsyncIOThread(); + workThread = channel.getReadIOThread(); } if (rs.exc != null) { workThread.runWork(() -> { @@ -362,10 +362,10 @@ public abstract class ClientConnection implements Co } } respWaitingCounter.increment(); //放在writeChannelInThread计数会延迟,导致不准确 - if (channel.inCurrThread()) { + if (channel.inCurrWriteThread()) { writeChannelInThread(request, respFuture); } else { - channel.execute(() -> writeChannelInThread(request, respFuture)); + channel.executeWrite(() -> writeChannelInThread(request, respFuture)); } return respFuture; } @@ -427,7 +427,7 @@ public abstract class ClientConnection implements Co Throwable e = exc == null ? new ClosedChannelException() : exc; CompletableFuture f; respWaitingCounter.reset(); - WorkThread thread = channel.getAsyncIOThread(); + WorkThread thread = channel.getReadIOThread(); if (!responseQueue.isEmpty()) { while ((f = responseQueue.poll()) != null) { CompletableFuture future = f; diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 45958a3db..fb144e2d5 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -64,10 +64,10 @@ public class ClientFuture extends CompletableFuture implements Runnable { return; } AsyncConnection channel = conn.getChannel(); - if (channel.inCurrThread()) { + if (channel.inCurrReadThread()) { this.runTimeout(); } else { - channel.execute(this::runTimeout); + channel.executeRead(this::runTimeout); } } @@ -93,7 +93,7 @@ public class ClientFuture extends CompletableFuture implements Runnable { // workThread.execute(() -> completeExceptionally(ex)); // } if (workThread == null || workThread.getWorkExecutor() == null) { - workThread = conn.getChannel().getAsyncIOThread(); + workThread = conn.getChannel().getReadIOThread(); } workThread.runWork(() -> completeExceptionally(ex)); } diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 6b00c48b8..c4f5f2e0b 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -1331,7 +1331,7 @@ public class HttpResponse extends Response { try { if (fileChannel != null && sends >= limit) { if (buffer != null) { - channel.offerBuffer(buffer); + channel.offerWriteBuffer(buffer); } try { fileChannel.close(); @@ -1377,7 +1377,7 @@ public class HttpResponse extends Response { @Override public void failed(Throwable exc, Void attachment) { if (buffer != null) { - channel.offerBuffer(buffer); + channel.offerWriteBuffer(buffer); } if (logger.isLoggable(Level.FINER)) { logger.log(Level.FINER, "finishFile error", exc); diff --git a/src/main/java/org/redkale/net/http/HttpSimpleClient.java b/src/main/java/org/redkale/net/http/HttpSimpleClient.java index 35c455cd6..2d2382b71 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleClient.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleClient.java @@ -120,7 +120,9 @@ public class HttpSimpleClient { }); } array.put((byte) '\r', (byte) '\n'); - if (body != null) array.put(body); + if (body != null) { + array.put(body); + } final CompletableFuture> future = new CompletableFuture(); conn.write(array, new CompletionHandler() { @Override @@ -213,14 +215,16 @@ public class HttpSimpleClient { return; } } - if (buffer.hasRemaining()) array.put(buffer, buffer.remaining()); + if (buffer.hasRemaining()) { + array.put(buffer, buffer.remaining()); + } this.readState = READ_STATE_END; } if (responseResult.getStatus() <= 200) { this.responseResult.setResult(array.getBytes()); } this.future.complete(this.responseResult); - conn.offerBuffer(buffer); + conn.offerReadBuffer(buffer); conn.dispose(); } @@ -240,7 +244,9 @@ public class HttpSimpleClient { buffer.put((byte) '\r'); return 1; } - if (buffer.get() != '\n') return -1; + if (buffer.get() != '\n') { + return -1; + } break; } bytes.put(b); @@ -273,7 +279,9 @@ public class HttpSimpleClient { remain--; byte b1 = buffer.get(); byte b2 = buffer.get(); - if (b1 == '\r' && b2 == '\n') return 0; + if (b1 == '\r' && b2 == '\n') { + return 0; + } bytes.put(b1, b2); for (;;) { // name if (remain-- < 1) { @@ -282,7 +290,9 @@ public class HttpSimpleClient { return 1; } byte b = buffer.get(); - if (b == ':') break; + if (b == ':') { + break; + } bytes.put(b); } String name = parseHeaderName(bytes, null); @@ -317,7 +327,9 @@ public class HttpSimpleClient { buffer.put((byte) '\r'); return 1; } - if (buffer.get() != '\n') return -1; + if (buffer.get() != '\n') { + return -1; + } break; } if (first) { @@ -346,7 +358,7 @@ public class HttpSimpleClient { @Override public void failed(Throwable exc, ByteBuffer attachment) { - conn.offerBuffer(attachment); + conn.offerReadBuffer(attachment); conn.dispose(); future.completeExceptionally(exc); } diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index fbc4377dc..632041902 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -139,7 +139,9 @@ public abstract class WebSocket { } public final CompletableFuture sendPing(byte[] data) { - if (data == null) return sendPing(); + if (data == null) { + return sendPing(); + } this.lastPingTime = System.currentTimeMillis(); return sendPacket(new WebSocketPacket(FrameType.PING, data)); } @@ -237,7 +239,9 @@ public abstract class WebSocket { */ CompletableFuture sendPacket(WebSocketPacket packet) { if (this._writeHandler == null) { - if (delayPackets == null) delayPackets = new ArrayList<>(); + if (delayPackets == null) { + delayPackets = new ArrayList<>(); + } delayPackets.add(packet); return CompletableFuture.completedFuture(RETCODE_DEAYSEND); } @@ -355,12 +359,16 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示异常 */ public final CompletableFuture sendMessage(final Convert convert, Object message, boolean last, Serializable... userids) { - if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + if (_engine.node == null) { + return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + } if (message instanceof CompletableFuture) { return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(convert, json, last, userids)); } CompletableFuture rs = _engine.node.sendMessage(convert, message, last, userids); - if (_engine.logger.isLoggable(Level.FINER)) _engine.logger.finer("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")"); + if (_engine.logger.isLoggable(Level.FINER)) { + _engine.logger.finer("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")"); + } return rs; } @@ -461,12 +469,16 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message, final boolean last) { - if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + if (_engine.node == null) { + return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + } if (message instanceof CompletableFuture) { return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(wsrange, convert, json, last)); } CompletableFuture rs = _engine.node.broadcastMessage(wsrange, convert, message, last); - if (_engine.logger.isLoggable(Level.FINER)) _engine.logger.finer("broadcast send websocket message(" + message + ")"); + if (_engine.logger.isLoggable(Level.FINER)) { + _engine.logger.finer("broadcast send websocket message(" + message + ")"); + } return rs; } @@ -479,9 +491,13 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示异常 */ public final CompletableFuture sendAction(final WebSocketAction action, Serializable... userids) { - if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + if (_engine.node == null) { + return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + } CompletableFuture rs = _engine.node.sendAction(action, userids); - if (_engine.logger.isLoggable(Level.FINER)) _engine.logger.finer("userids:" + Arrays.toString(userids) + " send websocket action(" + action + ")"); + if (_engine.logger.isLoggable(Level.FINER)) { + _engine.logger.finer("userids:" + Arrays.toString(userids) + " send websocket action(" + action + ")"); + } return rs; } @@ -493,9 +509,13 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture broadcastAction(final WebSocketAction action) { - if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + if (_engine.node == null) { + return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + } CompletableFuture rs = _engine.node.broadcastAction(action); - if (_engine.logger.isLoggable(Level.FINER)) _engine.logger.finer("broadcast send websocket action(" + action + ")"); + if (_engine.logger.isLoggable(Level.FINER)) { + _engine.logger.finer("broadcast send websocket action(" + action + ")"); + } return rs; } @@ -508,7 +528,9 @@ public abstract class WebSocket { * @return 地址列表 */ public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { - if (_engine.node == null) return CompletableFuture.completedFuture(null); + if (_engine.node == null) { + return CompletableFuture.completedFuture(null); + } return _engine.node.getRpcNodeAddresses(userid); } @@ -522,7 +544,9 @@ public abstract class WebSocket { * @return 地址集合 */ public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable userid) { - if (_engine.node == null) return CompletableFuture.completedFuture(null); + if (_engine.node == null) { + return CompletableFuture.completedFuture(null); + } return _engine.node.getRpcNodeWebSocketAddresses(userid); } @@ -534,7 +558,9 @@ public abstract class WebSocket { * @return CompletableFuture */ public CompletableFuture changeUserid(final G newuserid) { - if (newuserid == null) throw new NullPointerException("newuserid is null"); + if (newuserid == null) { + throw new NullPointerException("newuserid is null"); + } return _engine.changeLocalUserid(this, newuserid); } @@ -593,7 +619,9 @@ public abstract class WebSocket { * @param value 属性值 */ public final void setAttribute(String name, Object value) { - if (attributes == null) attributes = new HashMap<>(); + if (attributes == null) { + attributes = new HashMap<>(); + } attributes.put(name, value); } @@ -696,8 +724,8 @@ public abstract class WebSocket { * * @return Supplier */ - protected Supplier getBufferSupplier() { - return this._channel.getBufferSupplier(); + protected Supplier getReadBufferSupplier() { + return this._channel.getReadBufferSupplier(); } /** @@ -705,8 +733,26 @@ public abstract class WebSocket { * * @return Consumer */ - protected Consumer getBufferConsumer() { - return this._channel.getBufferConsumer(); + protected Consumer getReadBufferConsumer() { + return this._channel.getReadBufferConsumer(); + } + + /** + * 获取ByteBuffer生成器 + * + * @return Supplier + */ + protected Supplier getWriteBufferSupplier() { + return this._channel.getWriteBufferSupplier(); + } + + /** + * 获取ByteBuffer回收器 + * + * @return Consumer + */ + protected Consumer getWriteBufferConsumer() { + return this._channel.getWriteBufferConsumer(); } //------------------------------------------------------------------- @@ -896,23 +942,37 @@ public abstract class WebSocket { * 显式地关闭WebSocket */ public final void close() { - if (this.deflater != null) this.deflater.end(); - if (this.inflater != null) this.inflater.end(); + if (this.deflater != null) { + this.deflater.end(); + } + if (this.inflater != null) { + this.inflater.end(); + } CompletableFuture future = kill(CLOSECODE_SERVERCLOSE, "user close"); - if (future != null) future.join(); + if (future != null) { + future.join(); + } } //closeRunner CompletableFuture kill(int code, String reason) { - if (closed) return null; + if (closed) { + return null; + } synchronized (this) { - if (closed) return null; + if (closed) { + return null; + } closed = true; - if (_channel == null) return null; + if (_channel == null) { + return null; + } CompletableFuture future = _engine.removeLocalThenDisconnect(this); _channel.dispose(); CompletableFuture closeFuture = onClose(code, reason); - if (closeFuture == null) return future; + if (closeFuture == null) { + return future; + } return CompletableFuture.allOf(future, closeFuture); } } diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index 3afdf3148..6fbc33502 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -381,7 +381,7 @@ public final class SncpClient { try { if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data, params=" + JsonConvert.root().convertTo(params))); - conn.offerBuffer(buffer); + conn.offerReadBuffer(buffer); transport.offerConnection(true, conn); return; } @@ -400,7 +400,7 @@ public final class SncpClient { conn.setReadBuffer(buffer); conn.read(this); } else { - conn.offerBuffer(buffer); + conn.offerReadBuffer(buffer); success(); } return; @@ -424,7 +424,7 @@ public final class SncpClient { } else { this.body = new byte[respBodyLength]; buffer.get(body, 0, respBodyLength); - conn.offerBuffer(buffer); + conn.offerReadBuffer(buffer); success(); } } catch (Throwable e) { @@ -465,7 +465,7 @@ public final class SncpClient { @Override public void failed(Throwable exc, ByteBuffer attachment2) { future.completeExceptionally(new RpcRemoteException(action.method + " sncp remote exec failed, params=" + JsonConvert.root().convertTo(params))); - conn.offerBuffer(attachment2); + conn.offerReadBuffer(attachment2); transport.offerConnection(true, conn); if (handler != null) { final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;