From 2ef5eb6cd080dafd63041e6c83af12a5906b204e Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 2 Feb 2023 15:02:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/client/ClientCodec.java | 40 ++++-- .../redkale/net/client/ClientResponse.java | 17 ++- .../org/redkale/net/sncp/SncpClientCodec.java | 54 ++++++-- .../redkale/net/sncp/SncpClientResult.java | 129 ++++-------------- .../java/org/redkale/net/sncp/SncpHeader.java | 28 ++-- 5 files changed, 124 insertions(+), 144 deletions(-) diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 26b2f48db..1bccf8ade 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -40,8 +40,7 @@ public abstract class ClientCodec implements Complet this.connection = connection; } - //返回true: array会clear, 返回false: buffer会clear - public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array); + public abstract void decodeMessages(ByteBuffer buffer, ByteArray array); @Override public final void completed(Integer count, ByteBuffer attachment) { @@ -63,15 +62,20 @@ public abstract class ClientCodec implements Complet private void decodeResponse(ByteBuffer buffer) { AsyncConnection channel = connection.channel; connection.currRespIterator = null; - if (decodeMessages(buffer, readArray)) { //成功了 + decodeMessages(buffer, readArray); + if (!respResults.isEmpty()) { //存在解析结果 connection.currRespIterator = null; readArray.clear(); for (ClientResponse cr : respResults) { - ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); - if (respFuture != null) { - responseComplete(respFuture, cr.message, cr.exc); + if (cr.isError()) { + connection.dispose(null); + } else { + ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); + if (respFuture != null) { + responseComplete(respFuture, cr.message, cr.exc); + } + respPool.accept(cr); } - respPool.accept(cr); } respResults.clear(); @@ -95,7 +99,7 @@ public abstract class ClientCodec implements Complet R request = respFuture.request; WorkThread workThread = null; try { - if (!request.isCompleted()) { + if (request != null && !request.isCompleted()) { if (exc == null) { connection.sendHalfWrite(exc); //request没有发送完,respFuture需要再次接收 @@ -104,8 +108,10 @@ public abstract class ClientCodec implements Complet connection.sendHalfWrite(exc); } } - workThread = request.workThread; - request.workThread = null; + if (request != null) { + workThread = request.workThread; + request.workThread = null; + } connection.respWaitingCounter.decrement(); if (connection.isAuthenticated()) { connection.client.incrRespDoneCounter(); @@ -119,13 +125,17 @@ public abstract class ClientCodec implements Complet } if (exc != null) { workThread.runWork(() -> { - Traces.currTraceid(request.traceid); + if (request != null) { + Traces.currTraceid(request.traceid); + } respFuture.completeExceptionally(exc); }); } else { - final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); + final Object rs = request == null || request.respTransfer == null ? message : request.respTransfer.apply(message); workThread.runWork(() -> { - Traces.currTraceid(request.traceid); + if (request != null) { + Traces.currTraceid(request.traceid); + } ((ClientFuture) respFuture).complete(rs); }); } @@ -169,6 +179,10 @@ public abstract class ClientCodec implements Complet this.respResults.add(respPool.get().set(request, exc)); } + public void occurError(R request, Throwable exc) { + this.respResults.add(new ClientResponse.ClientErrorResponse<>(request, exc)); + } + @Override public String toString() { return JsonConvert.root().convertTo(this); diff --git a/src/main/java/org/redkale/net/client/ClientResponse.java b/src/main/java/org/redkale/net/client/ClientResponse.java index 829ef7a69..53cceeb08 100644 --- a/src/main/java/org/redkale/net/client/ClientResponse.java +++ b/src/main/java/org/redkale/net/client/ClientResponse.java @@ -21,7 +21,7 @@ import java.io.Serializable; */ public class ClientResponse { - protected R request; + protected R request; //服务端返回一个不存在的requestid,可能为null protected P message; @@ -101,4 +101,19 @@ public class ClientResponse { return "{\"message\":" + message + "}"; } + boolean isError() { + return false; + } + + static class ClientErrorResponse extends ClientResponse { + + public ClientErrorResponse(R request, Throwable exc) { + super(request, exc); + } + + @Override + boolean isError() { + return true; + } + } } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java index 941e55fb4..63fef3087 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java @@ -38,21 +38,21 @@ public class SncpClientCodec extends ClientCodec