From 851efa50977c217face954ee4ae94039c8d6cb42 Mon Sep 17 00:00:00 2001 From: Redkale Date: Thu, 12 Jan 2023 10:57:47 +0800 Subject: [PATCH] =?UTF-8?q?Client=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 24 ++- .../java/org/redkale/net/client/Client.java | 4 + .../org/redkale/net/client/ClientCodec.java | 141 ++++++++++++++++-- .../redkale/net/client/ClientConnection.java | 19 ++- .../net/client/ClientWriteIOThread.java | 31 +++- 5 files changed, 189 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 895e855ee..2aef4516f 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -301,6 +301,10 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl write(array.content(), array.offset(), array.length(), null, 0, 0, null, null, handler); } + public final void write(ByteTuple array, A attachment, CompletionHandler handler) { + write(array.content(), array.offset(), array.length(), null, 0, 0, null, null, attachment, handler); + } + public final void write(byte[] bytes, int offset, int length, CompletionHandler handler) { write(bytes, offset, length, null, 0, 0, null, null, handler); } @@ -310,6 +314,10 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, CompletionHandler handler) { + write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, null, handler); + } + + public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, Object handlerAttachment, CompletionHandler handler) { final ByteBuffer buffer = sslEngine == null ? pollWriteBuffer() : pollWriteSSLBuffer(); if (buffer.remaining() >= headerLength + bodyLength) { buffer.put(headerContent, headerOffset, headerLength); @@ -320,20 +328,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } buffer.flip(); - CompletionHandler newHandler = new CompletionHandler() { + CompletionHandler newHandler = new CompletionHandler() { @Override - public void completed(Integer result, Void attachment) { + public void completed(Integer result, Object attachment) { offerWriteBuffer(buffer); handler.completed(result, attachment); } @Override - public void failed(Throwable exc, Void attachment) { + public void failed(Throwable exc, Object attachment) { offerWriteBuffer(buffer); handler.failed(exc, attachment); } }; - write(buffer, null, newHandler); + write(buffer, handlerAttachment, newHandler); } else { ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? writeBufferSupplier : () -> pollWriteSSLBuffer(), buffer); writer.put(headerContent, headerOffset, headerLength); @@ -344,20 +352,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } final ByteBuffer[] buffers = writer.toBuffers(); - CompletionHandler newHandler = new CompletionHandler() { + CompletionHandler newHandler = new CompletionHandler() { @Override - public void completed(Integer result, Void attachment) { + public void completed(Integer result, Object attachment) { offerWriteBuffer(buffers); handler.completed(result, attachment); } @Override - public void failed(Throwable exc, Void attachment) { + public void failed(Throwable exc, Object attachment) { offerWriteBuffer(buffers); handler.failed(exc, attachment); } }; - write(buffers, null, newHandler); + write(buffers, handlerAttachment, newHandler); } } diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 268772f3e..cab432a16 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -315,6 +315,10 @@ public abstract class Client implements Resourcable reqWritedCounter.increment(); } + protected void incrRespDoneCounter() { + respDoneCounter.increment(); + } + @Override public String resourceName() { return name; diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index afdd52c1c..6ea2a18fe 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -5,10 +5,14 @@ */ package org.redkale.net.client; +import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.channels.*; import java.util.*; +import java.util.logging.Level; import org.redkale.convert.json.JsonConvert; -import org.redkale.util.ByteArray; +import org.redkale.net.*; +import org.redkale.util.*; /** * 每个ClientConnection绑定一个独立的ClientCodec实例 @@ -21,26 +25,141 @@ import org.redkale.util.ByteArray; * @param ClientRequest * @param

响应对象 */ -public abstract class ClientCodec { +public abstract class ClientCodec implements CompletionHandler { - protected final List> results = new ArrayList<>(); + private final List> repsResults = new ArrayList<>(); - protected final ClientConnection connection; + private final ClientConnection connection; + + private final ByteArray readArray = new ByteArray(); public ClientCodec(ClientConnection connection) { this.connection = connection; } //返回true: array会clear, 返回false: buffer会clear - public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array); + public abstract boolean decodeMessages(ClientConnection connection, ByteBuffer buffer, ByteArray array); + + @Override + public final void completed(Integer count, ByteBuffer attachment) { + AsyncConnection channel = connection.channel; + if (count < 1) { + channel.setReadBuffer(attachment); + connection.dispose(new NonReadableChannelException()); + return; + } + try { + attachment.flip(); + decodeResponse(attachment); + } catch (Throwable e) { + channel.setReadBuffer(attachment); + connection.dispose(e); + } + } + + private void decodeResponse(ByteBuffer buffer) { + AsyncConnection channel = connection.channel; + Deque responseQueue = connection.responseQueue; + Map responseMap = connection.responseMap; + if (decodeMessages(connection, buffer, readArray)) { //成功了 + readArray.clear(); + List> results = pollMessages(); + if (results != null) { + for (ClientResponse

rs : results) { + Serializable reqid = rs.getRequestid(); + ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); + if (respFuture != null) { + int mergeCount = respFuture.getMergeCount(); + completeResponse(rs, respFuture); + if (mergeCount > 0) { + for (int i = 0; i < mergeCount; i++) { + respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); + if (respFuture != null) { + completeResponse(rs, respFuture); + } + } + } + } + } + } + + if (buffer.hasRemaining()) { + decodeResponse(buffer); + } else { //队列都已处理完了 + buffer.clear(); + channel.setReadBuffer(buffer); + channel.read(this); + } + } else { //数据不全, 继续读 + buffer.clear(); + channel.setReadBuffer(buffer); + channel.read(this); + } + } + + private void completeResponse(ClientResponse

rs, ClientFuture respFuture) { + if (respFuture != null) { + ClientRequest request = respFuture.request; + if (!request.isCompleted()) { + if (rs.exc == null) { + //request没有发送完,respFuture需要再次接收 + Serializable reqid = request.getRequestid(); + if (reqid == null) { + connection.responseQueue.offerFirst(respFuture); + } else { + connection.responseMap.put(reqid, respFuture); + } + connection.pauseWriting.set(false); + connection.wakeupWrite(); + return; + } else { //异常了需要清掉半包 + connection.lastHalfEntry = null; + connection.pauseWriting.set(false); + connection.wakeupWrite(); + } + } + connection.respWaitingCounter.decrement(); + if (connection.isAuthenticated()) { + connection.client.incrRespDoneCounter(); + } + try { + respFuture.cancelTimeout(); + //if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message); + connection.preComplete(rs.message, (R) request, rs.exc); + WorkThread workThread = request.workThread; + request.workThread = null; + if (workThread == null || workThread.getWorkExecutor() == null) { + workThread = connection.channel.getReadIOThread(); + } + if (rs.exc != null) { + workThread.runWork(() -> { + Traces.currTraceid(request.traceid); + respFuture.completeExceptionally(rs.exc); + }); + } else { + workThread.runWork(() -> { + Traces.currTraceid(request.traceid); + respFuture.complete(rs.message); + }); + } + } catch (Throwable t) { + connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); + } + } + } + + @Override + public final void failed(Throwable t, ByteBuffer attachment) { + connection.dispose(t); + } protected Iterator responseIterator() { return connection.responseQueue.iterator(); } public List> pollMessages() { - List> rs = new ArrayList<>(results); - this.results.clear(); + List> rs = new ArrayList<>(repsResults); + this.repsResults.clear(); return rs; } @@ -49,15 +168,11 @@ public abstract class ClientCodec { } public void addMessage(P result) { - this.results.add(new ClientResponse<>(result)); + this.repsResults.add(new ClientResponse<>(result)); } public void addMessage(Throwable exc) { - this.results.add(new ClientResponse<>(exc)); - } - - public void reset() { - this.results.clear(); + this.repsResults.add(new ClientResponse<>(exc)); } @Override diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 6eecf6dbc..2d4ba1c9b 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -54,10 +54,12 @@ public abstract class ClientConnection implements Co protected final Queue>> requestQueue = new ArrayDeque<>(); //responseQueue、responseMap二选一 - final ArrayDeque responseQueue = new ArrayDeque<>(); + final Deque responseQueue = new LinkedBlockingDeque<>(); //responseQueue、responseMap二选一, key: requestid - final HashMap responseMap = new LinkedHashMap<>(); + final Map responseMap = new ConcurrentHashMap<>(); + + SimpleEntry> lastHalfEntry; private int maxPipelines; //最大并行处理数 @@ -65,8 +67,6 @@ public abstract class ClientConnection implements Co private boolean authenticated; - private SimpleEntry> lastHalfEntry; - protected final CompletionHandler readHandler = new CompletionHandler() { @Override @@ -86,7 +86,7 @@ public abstract class ClientConnection implements Co } private void decodeResponse(ByteBuffer buffer) { - if (codec.decodeMessages(buffer, readArray)) { //成功了 + if (codec.decodeMessages(ClientConnection.this, buffer, readArray)) { //成功了 readArray.clear(); List> results = codec.pollMessages(); if (results != null) { @@ -279,7 +279,7 @@ public abstract class ClientConnection implements Co } //返回写入数据request的数量,返回0表示没有可写的request - private int sendWrite(boolean must) { + int sendWrite(boolean must) { ClientConnection conn = this; ByteArray rw = conn.writeArray; rw.clear(); @@ -386,6 +386,13 @@ public abstract class ClientConnection implements Co } } + public void wakeupWrite() { + AsyncIOThread thread = channel.getWriteIOThread(); + if (thread instanceof ClientWriteIOThread) { + ((ClientWriteIOThread) thread).wakeupWrite(); + } + } + public boolean isAuthenticated() { return authenticated; } diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index c07a7643f..e26613308 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -5,8 +5,9 @@ package org.redkale.net.client; import java.io.Serializable; import java.nio.ByteBuffer; -import java.nio.channels.Selector; +import java.nio.channels.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.redkale.util.*; /** @@ -26,6 +27,12 @@ public class ClientWriteIOThread extends ClientIOThread { requestQueue.offer(new ClientEntity(conn, request, respFuture)); } + public void wakeupWrite() { + synchronized (writeHandler) { + writeHandler.notify(); + } + } + @Override public void run() { final ByteBuffer buffer = getBufferSupplier().get(); @@ -37,6 +44,7 @@ public class ClientWriteIOThread extends ClientIOThread { ClientConnection conn = entity.conn; ClientRequest request = entity.request; ClientFuture respFuture = entity.respFuture; + AtomicBoolean pw = conn.pauseWriting; Serializable reqid = request.getRequestid(); if (reqid == null) { conn.responseQueue.offer(respFuture); @@ -50,9 +58,14 @@ public class ClientWriteIOThread extends ClientIOThread { buffer.clear(); buffer.put(rw.content(), 0, rw.length()); buffer.flip(); - conn.channel.write(buffer, null, conn.writeHandler); + conn.channel.write(buffer, conn, writeHandler); } else { - conn.channel.write(rw, conn.writeHandler); + conn.channel.write(rw, conn, writeHandler); + } + if (pw.get()) { + synchronized (writeHandler) { + writeHandler.wait(30_000); + } } } } catch (InterruptedException e) { @@ -60,6 +73,18 @@ public class ClientWriteIOThread extends ClientIOThread { } } + protected final CompletionHandler writeHandler = new CompletionHandler() { + + @Override + public void completed(Integer result, ClientConnection attachment) { + } + + @Override + public void failed(Throwable exc, ClientConnection attachment) { + attachment.dispose(exc); + } + }; + protected static class ClientEntity { ClientConnection conn;