diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 86d861a2f..a560d983a 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -224,6 +224,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); + public abstract void clientWrite(byte[] data, A attachment, CompletionHandler handler); + protected abstract void readRegisterImpl(CompletionHandler handler); protected abstract void readImpl(CompletionHandler handler); @@ -238,6 +240,10 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { read(handler); } + public final void clientWrite(byte[] data, CompletionHandler handler) { + clientWrite(data, null, handler); + } + public final void startReadInIOThread(CompletionHandler handler) { if (inCurrReadThread()) { startRead(handler); diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 982eebcc7..f9c246551 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -60,9 +60,7 @@ public class AsyncIOThread extends WorkThread { } public void interestOpsOr(SelectionKey key, int opt) { - if (key == null) { - return; - } + Objects.requireNonNull(key); if (key.selector() != selector) { throw new RedkaleException("NioThread.selector not the same to SelectionKey.selector"); } diff --git a/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java b/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java index c1dc37aee..882ff5c1c 100644 --- a/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java +++ b/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java @@ -24,7 +24,7 @@ class AsyncNioCompletionHandler implements CompletionHandler, Run private final boolean readMode; - private CompletionHandler handler; + CompletionHandler handler; private A attachment; diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 3dd03d243..392dd02fd 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -9,11 +9,11 @@ import java.io.*; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; -import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.function.Consumer; import javax.net.ssl.SSLContext; -import org.redkale.util.ByteBufferWriter; +import org.redkale.util.*; /** * @@ -80,6 +80,10 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; + //-------------------------- 用于客户端的Socket -------------------------- + //用于客户端的Socket + protected final Queue clientModeWriteQueue = new ConcurrentLinkedQueue<>(); + public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); @@ -127,7 +131,7 @@ abstract class AsyncNioConnection extends AsyncConnection { handler.failed(new NotYetConnectedException(), null); return; } - if (handler != protocolCodec) { + if (handler != readCompletionHandler && handler != readTimeoutCompletionHandler.handler) { if (this.readPending) { handler.failed(new ReadPendingException(), null); return; @@ -142,19 +146,20 @@ abstract class AsyncNioConnection extends AsyncConnection { this.readCompletionHandler = handler; } } else { - this.readCompletionHandler = handler; this.readPending = true; } try { if (readKey == null) { ioReadThread.register(selector -> { - if (readKey == null) { - try { + try { + if (readKey == null) { readKey = implRegister(selector, SelectionKey.OP_READ); readKey.attach(this); - } catch (ClosedChannelException e) { - handleRead(0, e); + } else { + readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); } + } catch (ClosedChannelException e) { + handleRead(0, e); } }); } else { @@ -281,6 +286,38 @@ abstract class AsyncNioConnection extends AsyncConnection { doWrite(); } + @Override + public void clientWrite(byte[] data, A attachment, CompletionHandler handler) { + if (!this.isConnected()) { + handler.failed(new NotYetConnectedException(), null); + return; + } + Objects.requireNonNull(data); + Objects.requireNonNull(handler); + this.writePending = true; + this.clientModeWriteQueue.offer(data); + this.writeCompletionHandler = (CompletionHandler) handler; + this.writeAttachment = attachment; + if (writeKey == null) { + ioWriteThread.register(selector -> { + try { + if (writeKey == null) { + writeKey = implRegister(selector, SelectionKey.OP_WRITE); + writeKey.attach(this); + } else { + writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); + } + } catch (ClosedChannelException e) { + this.writeCompletionHandler = (CompletionHandler) handler; + this.writeAttachment = attachment; + handleWrite(0, e); + } + }); + } else { + ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); + } + } + public void doRead(boolean direct) { try { this.readTime = System.currentTimeMillis(); @@ -300,8 +337,12 @@ abstract class AsyncNioConnection extends AsyncConnection { } else if (readKey == null) { ioReadThread.register(selector -> { try { - readKey = implRegister(selector, SelectionKey.OP_READ); - readKey.attach(this); + if (readKey == null) { + readKey = implRegister(selector, SelectionKey.OP_READ); + readKey.attach(this); + } else { + readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); + } } catch (ClosedChannelException e) { handleRead(0, e); } @@ -321,6 +362,23 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; + if (clientMode && writeByteTuple1Array == null && !clientModeWriteQueue.isEmpty()) { + byte[] bs = null; + byte[] item; + while ((item = clientModeWriteQueue.poll()) != null) { + bs = Utility.append(bs, item); + } + this.writePending = true; + this.writeByteTuple1Array = bs; + this.writeByteTuple1Offset = 0; + this.writeByteTuple1Length = bs == null ? 0 : bs.length; + this.writeByteTuple2Array = null; + this.writeByteTuple2Offset = 0; + this.writeByteTuple2Length = 0; + this.writeOffset = 0; + this.writeLength = this.writeByteTuple1Length; + } + int batchOffset = writeOffset; int batchLength = writeLength; while (hasRemain) { //必须要将buffer写完为止 @@ -386,11 +444,11 @@ abstract class AsyncNioConnection extends AsyncConnection { if (writeCount == 0) { if (hasRemain) { - writeCompleted = false; - writeTotal = totalCount; - //continue; //要全部输出完才返回 + //writeCompleted = false; + //writeTotal = totalCount; + continue; //要全部输出完才返回 } - break; + break; } else if (writeCount < 0) { if (totalCount == 0) { totalCount = writeCount; @@ -407,14 +465,27 @@ abstract class AsyncNioConnection extends AsyncConnection { if (writeCompleted && (totalCount != 0 || !hasRemain)) { handleWrite(writeTotal + totalCount, null); } else if (writeKey == null) { - ioWriteThread.register(selector -> { + if (inCurrWriteThread()) { try { - writeKey = implRegister(selector, SelectionKey.OP_WRITE); + writeKey = implRegister(ioWriteThread.selector, SelectionKey.OP_WRITE); writeKey.attach(this); } catch (ClosedChannelException e) { handleWrite(0, e); } - }); + } else { + ioWriteThread.register(selector -> { + try { + if (writeKey == null) { + writeKey = implRegister(selector, SelectionKey.OP_WRITE); + writeKey.attach(this); + } else { + writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); + } + } catch (ClosedChannelException e) { + handleWrite(0, e); + } + }); + } } else { ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); } diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 90098d69a..9c2c9781e 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -281,16 +281,18 @@ public abstract class Client, R extends ClientR if (authenticate != null) { future = future.thenCompose(authenticate); } - return future.thenApply(c -> { - c.setAuthenticated(true); - this.connArray[connIndex] = c; - CompletableFuture f; - while ((f = waitQueue.poll()) != null) { - if (!f.isDone()) { - f.complete(c); + return future.thenCompose(c -> { + return CompletableFuture.supplyAsync(() -> { + c.setAuthenticated(true); + this.connArray[connIndex] = c; + CompletableFuture f; + while ((f = waitQueue.poll()) != null) { + if (!f.isDone()) { + f.complete(c); + } } - } - return c; + return c; + }, c.channel.getWriteIOThread()); }).whenComplete((r, t) -> { if (t != null) { this.connOpenStates[connIndex].set(false); @@ -324,16 +326,18 @@ public abstract class Client, R extends ClientR if (authenticate != null) { future = future.thenCompose(authenticate); } - return future.thenApply(c -> { - c.setAuthenticated(true); - entry.connection = c; - CompletableFuture f; - while ((f = waitQueue.poll()) != null) { - if (!f.isDone()) { - f.complete(c); + return future.thenCompose(c -> { + return CompletableFuture.supplyAsync(() -> { + c.setAuthenticated(true); + entry.connection = c; + CompletableFuture f; + while ((f = waitQueue.poll()) != null) { + if (!f.isDone()) { + f.complete(c); + } } - } - return c; + return c; + }, c.channel.getWriteIOThread()); }).whenComplete((r, t) -> { if (t != null) { entry.connOpenState.set(false); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 57fc44d24..556ffc301 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -51,47 +51,16 @@ public abstract class ClientConnection implements Co protected final ByteArray writeArray = new ByteArray(); + protected final ThreadLocal arrayThreadLocal = ThreadLocal.withInitial(() -> new ByteArray()); + protected final ByteBuffer writeBuffer; protected final CompletionHandler writeHandler = new CompletionHandler() { @Override public void completed(Integer result, ClientConnection attachment) { - if (pauseWriting.get()) { //等待sendHalfWriteInReadThread调用 - if (!writePending.compareAndSet(true, false)) { - completed(0, attachment); - } - return; - } - ByteArray array = writeArray; - array.clear(); - ClientFuture respFuture; - while ((respFuture = requestQueue.poll()) != null) { - if (!respFuture.isDone()) { - R request = respFuture.request; - request.writeTo(attachment, array); - if (request.isCompleted()) { - doneRequestCounter.increment(); - } else { //还剩半包没发送完 - pauseWriting.set(true); - currHalfWriteFuture = respFuture; - break; - } - } - } - if (array.length() > 0) { - if (writeBuffer.capacity() >= array.length()) { - writeBuffer.clear(); - writeBuffer.put(array.content(), 0, array.length()); - writeBuffer.flip(); - channel.write(writeBuffer, attachment, this); - } else { - channel.write(array, attachment, this); - } - } else { - if (!writePending.compareAndSet(true, false)) { - completed(0, attachment); - } + if (attachment == null) { //新方式 + channel.readRegister(getCodec()); } } @@ -116,8 +85,6 @@ public abstract class ClientConnection implements Co private final ClientCodec codec; - private final ConcurrentLinkedQueue> requestQueue = new ConcurrentLinkedQueue(); - //respFutureQueue、respFutureMap二选一, SPSC队列模式 private final ConcurrentLinkedDeque> respFutureQueue = new ConcurrentLinkedDeque<>(); @@ -156,7 +123,6 @@ public abstract class ClientConnection implements Co respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); } respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟,导致不准确 - writeLock.lock(); try { offerRespFuture(respFuture); @@ -172,7 +138,18 @@ public abstract class ClientConnection implements Co } private void sendRequestInLocking(R request, ClientFuture respFuture) { - if (writePending.compareAndSet(false, true)) { + if (true) { //新方式 + ByteArray array = arrayThreadLocal.get(); + array.clear(); + request.writeTo(this, array); + if (request.isCompleted()) { + doneRequestCounter.increment(); + } else { //还剩半包没发送完 + pauseWriting.set(true); + currHalfWriteFuture = respFuture; + } + channel.clientWrite(array.getBytes(), writeHandler); + } else { //旧方式 //发送请求数据包 writeArray.clear(); request.writeTo(this, writeArray); @@ -191,11 +168,7 @@ public abstract class ClientConnection implements Co } else { channel.write(writeArray, this, writeHandler); } - } else { - writePending.compareAndSet(true, false); } - } else { - requestQueue.offer(respFuture); } } @@ -207,26 +180,14 @@ public abstract class ClientConnection implements Co ClientFuture respFuture = this.currHalfWriteFuture; if (respFuture != null) { this.currHalfWriteFuture = null; - if (!respFuture.isDone()) { - if (halfRequestExc == null) { - offerFirstRespFuture(respFuture); - ClientFuture future; - while ((future = pauseRequests.poll()) != null) { - requestQueue.add(future); - } - sendRequestInLocking(request, respFuture); - return; - } else { - codec.responseComplete(true, respFuture, null, halfRequestExc); - } + if (halfRequestExc == null) { + offerFirstRespFuture(respFuture); + sendRequestInLocking(request, respFuture); + } else { + codec.responseComplete(true, respFuture, null, halfRequestExc); } } - respFuture = pauseRequests.poll(); - if (respFuture != null) { - ClientFuture future; - while ((future = pauseRequests.poll()) != null) { - requestQueue.add(future); - } + while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) { sendRequestInLocking((R) respFuture.getRequest(), respFuture); } } finally {