From 19db4fdae9da4c60cff321488b8644e4ba663874 Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 21 Sep 2023 22:36:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 6 +- .../org/redkale/net/AsyncNioConnection.java | 90 ++++++------- .../redkale/net/client/ClientConnection.java | 120 +++++++++++------- 3 files changed, 124 insertions(+), 92 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 156c5fe95..754563c15 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -224,9 +224,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); - public abstract AsyncConnection fastHandler(CompletionHandler handler); - - public abstract void fastWrite(byte[] data); +// public abstract AsyncConnection fastHandler(CompletionHandler handler); +// +// public abstract void fastWrite(byte[] data); protected abstract void readRegisterImpl(CompletionHandler handler); diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index b56b88629..993fe15b4 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -85,7 +85,7 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; - protected CompletionHandler writeFastHandler; +// protected CompletionHandler writeFastHandler; public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { @@ -117,12 +117,12 @@ abstract class AsyncNioConnection extends AsyncConnection { return this.writeTimeoutSeconds; } - @Override - public AsyncConnection fastHandler(CompletionHandler handler) { - Objects.requireNonNull(handler); - this.writeFastHandler = (CompletionHandler) handler; - return this; - } +// @Override +// public AsyncConnection fastHandler(CompletionHandler handler) { +// Objects.requireNonNull(handler); +// this.writeFastHandler = (CompletionHandler) handler; +// return this; +// } @Override protected void startHandshake(final Consumer callback) { @@ -299,44 +299,44 @@ abstract class AsyncNioConnection extends AsyncConnection { doWrite(); } - @Override - public void fastWrite(byte[] data) { - CompletionHandler handler = this.writeFastHandler; - Objects.requireNonNull(data); - Objects.requireNonNull(handler, "fastHandler is null"); - if (!this.isConnected()) { - handler.failed(new NotYetConnectedException(), null); - return; - } - this.writePending = true; - this.fastWriteQueue.offer(data); - this.fastWriteCount.incrementAndGet(); - this.writeCompletionHandler = (CompletionHandler) handler; - this.writeAttachment = null; - try { - if (writeKey == null) { - ioWriteThread.register(selector -> { - try { - if (writeKey == null) { - writeKey = keyFor(selector); - } - if (writeKey == null) { - writeKey = implRegister(selector, SelectionKey.OP_WRITE); - writeKey.attach(this); - } else { - writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); - } - } catch (ClosedChannelException e) { - handleWrite(0, e); - } - }); - } else { - ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); - } - } catch (Exception e) { - handleWrite(0, e); - } - } +// @Override +// public void fastWrite(byte[] data) { +// CompletionHandler handler = this.writeFastHandler; +// Objects.requireNonNull(data); +// Objects.requireNonNull(handler, "fastHandler is null"); +// if (!this.isConnected()) { +// handler.failed(new NotYetConnectedException(), null); +// return; +// } +// this.writePending = true; +// this.fastWriteQueue.offer(data); +// this.fastWriteCount.incrementAndGet(); +// this.writeCompletionHandler = (CompletionHandler) handler; +// this.writeAttachment = null; +// try { +// if (writeKey == null) { +// ioWriteThread.register(selector -> { +// try { +// if (writeKey == null) { +// writeKey = keyFor(selector); +// } +// if (writeKey == null) { +// writeKey = implRegister(selector, SelectionKey.OP_WRITE); +// writeKey.attach(this); +// } else { +// writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); +// } +// } catch (ClosedChannelException e) { +// handleWrite(0, e); +// } +// }); +// } else { +// ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); +// } +// } catch (Exception e) { +// handleWrite(0, e); +// } +// } public void doRead(boolean direct) { try { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 8123998cc..ca251e37f 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -102,7 +102,7 @@ public abstract class ClientConnection implements Co this.index = index; this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()); this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting; - this.channel = channel.beforeCloseListener(this).fastHandler(writeHandler); + this.channel = channel.beforeCloseListener(this); //.fastHandler(writeHandler); this.writeBuffer = channel.pollWriteBuffer(); } @@ -139,6 +139,28 @@ public abstract class ClientConnection implements Co return respFuture; } + private void sendRequestInLocking(R request, ClientFuture respFuture) { + //发送请求数据包 + writeArray.clear(); + request.writeTo(this, writeArray); + if (request.isCompleted()) { + doneRequestCounter.increment(); + } else { //还剩半包没发送完 + pauseWriting.set(true); + currHalfWriteFuture = respFuture; + } + if (writeArray.length() > 0) { + if (writeBuffer.capacity() >= writeArray.length()) { + writeBuffer.clear(); + writeBuffer.put(writeArray.content(), 0, writeArray.length()); + writeBuffer.flip(); + channel.write(writeBuffer, this, writeHandler); + } else { + channel.write(writeArray, this, writeHandler); + } + } + } + //respTransfer只会在ClientCodec的读线程里调用 protected final CompletableFuture> writeChannel(R[] requests, Function respTransfer) { ClientFuture[] respFutures = new ClientFuture[requests.length]; @@ -156,63 +178,32 @@ public abstract class ClientConnection implements Co writeLock.lock(); try { - for (ClientFuture respFuture : respFutures) { - offerRespFuture(respFuture); - if (pauseWriting.get()) { + if (pauseWriting.get()) { + for (ClientFuture respFuture : respFutures) { + offerRespFuture(respFuture); pauseRequests.add(respFuture); } + } else { + for (ClientFuture respFuture : respFutures) { + offerRespFuture(respFuture); + } + sendRequestInLocking(respFutures); } - sendRequestInLocking(respFutures); } finally { writeLock.unlock(); } return Utility.allOfFutures(respFutures); } - private void sendRequestInLocking(R request, ClientFuture respFuture) { - if (false) { //新方式 - ByteArray array = arrayThreadLocal.get(); - array.clear(); - request.writeTo(this, array); - if (request.isCompleted()) { - doneRequestCounter.increment(); - } else { //还剩半包没发送完 - pauseWriting.set(true); - currHalfWriteFuture = respFuture; - } - channel.fastWrite(array.getBytes()); - } else { //旧方式 - //发送请求数据包 - writeArray.clear(); - request.writeTo(this, writeArray); - if (request.isCompleted()) { - doneRequestCounter.increment(); - } else { //还剩半包没发送完 - pauseWriting.set(true); - currHalfWriteFuture = respFuture; - } - if (writeArray.length() > 0) { - if (writeBuffer.capacity() >= writeArray.length()) { - writeBuffer.clear(); - writeBuffer.put(writeArray.content(), 0, writeArray.length()); - writeBuffer.flip(); - channel.write(writeBuffer, this, writeHandler); - } else { - channel.write(writeArray, this, writeHandler); - } - } - } - } - private void sendRequestInLocking(ClientFuture[] respFutures) { - ByteArray array = arrayThreadLocal.get(); - array.clear(); + //发送请求数据包 + writeArray.clear(); for (ClientFuture respFuture : respFutures) { if (pauseWriting.get()) { pauseRequests.add(respFuture); } else { ClientRequest request = respFuture.request; - request.writeTo(this, array); + request.writeTo(this, writeArray); if (request.isCompleted()) { doneRequestCounter.increment(); } else { //还剩半包没发送完 @@ -221,9 +212,50 @@ public abstract class ClientConnection implements Co } } } - channel.fastWrite(array.getBytes()); + if (writeArray.length() > 0) { + if (writeBuffer.capacity() >= writeArray.length()) { + writeBuffer.clear(); + writeBuffer.put(writeArray.content(), 0, writeArray.length()); + writeBuffer.flip(); + channel.write(writeBuffer, this, writeHandler); + } else { + channel.write(writeArray, this, writeHandler); + } + } } +// private void sendFastRequestInLocking(R request, ClientFuture respFuture) { +// ByteArray array = arrayThreadLocal.get(); +// array.clear(); +// request.writeTo(this, array); +// if (request.isCompleted()) { +// doneRequestCounter.increment(); +// } else { //还剩半包没发送完 +// pauseWriting.set(true); +// currHalfWriteFuture = respFuture; +// } +// channel.fastWrite(array.getBytes()); +// } +// +// private void sendFastRequestInLocking(ClientFuture[] respFutures) { +// ByteArray array = arrayThreadLocal.get(); +// array.clear(); +// for (ClientFuture respFuture : respFutures) { +// if (pauseWriting.get()) { +// pauseRequests.add(respFuture); +// } else { +// ClientRequest request = respFuture.request; +// request.writeTo(this, array); +// if (request.isCompleted()) { +// doneRequestCounter.increment(); +// } else { //还剩半包没发送完 +// pauseWriting.set(true); +// currHalfWriteFuture = respFuture; +// } +// } +// } +// channel.fastWrite(array.getBytes()); +// } //发送半包和积压的请求数据包 void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) { writeLock.lock();