From 391352e0776a11473c23462713c4cf4ba66d9c54 Mon Sep 17 00:00:00 2001 From: Redkale Date: Sun, 1 Jan 2023 13:02:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/Context.java | 2 +- .../java/org/redkale/net/ProtocolCodec.java | 2 +- .../org/redkale/net/client/ClientCodec.java | 19 +++++------ .../redkale/net/client/ClientConnection.java | 32 +++++++++---------- ...{ClientResult.java => ClientResponse.java} | 22 ++++++------- 5 files changed, 37 insertions(+), 40 deletions(-) rename src/main/java/org/redkale/net/client/{ClientResult.java => ClientResponse.java} (59%) diff --git a/src/main/java/org/redkale/net/Context.java b/src/main/java/org/redkale/net/Context.java index a97452604..d9ce64ca8 100644 --- a/src/main/java/org/redkale/net/Context.java +++ b/src/main/java/org/redkale/net/Context.java @@ -110,7 +110,7 @@ public class Context { } } - protected void executeDispatcher(Request request, Response response) { + protected void executeDispatch(Request request, Response response) { if (workHashExecutor != null) { workHashExecutor.execute(request.getHashid(), () -> prepare.prepare(request, response)); } else if (workExecutor != null) { diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index 7012fd42e..595c12c86 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -169,7 +169,7 @@ class ProtocolCodec implements CompletionHandler { request.pipeline(pindex, pindex); channel.setReadBuffer((ByteBuffer) buffer.clear()); } - context.executeDispatcher(request, response); + context.executeDispatch(request, response); if (pipeline) { final Response pipelineResponse = createResponse(); try { diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 2bed6a7c6..b31bd1e79 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -23,7 +23,7 @@ import org.redkale.util.ByteArray; */ public abstract class ClientCodec { - protected final List> results = new ArrayList<>(); + protected final List> results = new ArrayList<>(); protected final ClientConnection connection; @@ -32,17 +32,14 @@ public abstract class ClientCodec { } //返回true: array会clear, 返回false: buffer会clear - public abstract boolean codecResult(ByteBuffer buffer, ByteArray array); + public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array); protected Queue responseQueue() { return connection.responseQueue; } - public List> removeResults() { - if (results.isEmpty()) { - return null; - } - List> rs = new ArrayList<>(results); + public List> pollMessages() { + List> rs = new ArrayList<>(results); this.results.clear(); return rs; } @@ -51,12 +48,12 @@ public abstract class ClientCodec { return connection; } - public void addResult(P result) { - this.results.add(new ClientResult<>(result)); + public void addMessage(P result) { + this.results.add(new ClientResponse<>(result)); } - public void addResult(Throwable exc) { - this.results.add(new ClientResult<>(exc)); + public void addMessage(Throwable exc) { + this.results.add(new ClientResponse<>(exc)); } public void reset() { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 22f907b5c..7a002104e 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -64,7 +64,7 @@ public abstract class ClientConnection implements Co closeFuture = null; return; } - if (continueWrite(false)) { + if (sendWrite(false)) { return; } writePending.compareAndSet(true, false); @@ -95,7 +95,7 @@ public abstract class ClientConnection implements Co this.pauseWriting.set(false); } - private boolean continueWrite(boolean must) { + private boolean sendWrite(boolean must) { ClientConnection conn = this; ByteArray rw = conn.writeArray; rw.clear(); @@ -168,14 +168,14 @@ public abstract class ClientConnection implements Co } try { attachment.flip(); - codecResponse(attachment); + decodeResponse(attachment); } catch (Throwable e) { channel.setReadBuffer(attachment); dispose(e); } } - protected void completeResponse(ClientResult

rs, ClientFuture respFuture) { + protected void completeResponse(ClientResponse

rs, ClientFuture respFuture) { if (respFuture != null) { if (!respFuture.request.isCompleted()) { if (rs.exc == null) { @@ -196,8 +196,8 @@ public abstract class ClientConnection implements Co respFuture.timeout.cancel(true); } ClientRequest request = respFuture.request; - //if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", result=" + rs.result); - preComplete(rs.result, (R) request, rs.exc); + //if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message); + preComplete(rs.message, (R) request, rs.exc); WorkThread workThread = null; if (request != null) { workThread = request.workThread; @@ -224,13 +224,13 @@ public abstract class ClientConnection implements Co // if (request != null) { // Traces.currTraceid(request.traceid); // } -// respFuture.complete(rs.result); +// respFuture.complete(rs.message); // } else { // workThread.execute(() -> { // if (request != null) { // Traces.currTraceid(request.traceid); // } -// respFuture.complete(rs.result); +// respFuture.complete(rs.message); // }); // } // } @@ -249,7 +249,7 @@ public abstract class ClientConnection implements Co if (request != null) { Traces.currTraceid(request.traceid); } - respFuture.complete(rs.result); + respFuture.complete(rs.message); }); } } catch (Throwable t) { @@ -258,12 +258,12 @@ public abstract class ClientConnection implements Co } } - public void codecResponse(ByteBuffer buffer) { - if (codec.codecResult(buffer, readArray)) { //成功了 + public void decodeResponse(ByteBuffer buffer) { + if (codec.decodeMessages(buffer, readArray)) { //成功了 readArray.clear(); - List> results = codec.removeResults(); + List> results = codec.pollMessages(); if (results != null) { - for (ClientResult

rs : results) { + for (ClientResponse

rs : results) { ClientFuture respFuture = responseQueue.poll(); if (respFuture != null) { int mergeCount = respFuture.mergeCount; @@ -281,7 +281,7 @@ public abstract class ClientConnection implements Co } if (buffer.hasRemaining()) { - codecResponse(buffer); + decodeResponse(buffer); } else if (responseQueue.isEmpty()) { //队列都已处理完了 buffer.clear(); channel.setReadBuffer(buffer); @@ -293,7 +293,7 @@ public abstract class ClientConnection implements Co } else { //还有消息需要读取 if ((!requestQueue.isEmpty() || lastHalfRequest != null) && writePending.compareAndSet(false, true)) { //先写后读取 - if (!continueWrite(true)) { + if (!sendWrite(true)) { writePending.compareAndSet(true, false); } } @@ -366,7 +366,7 @@ public abstract class ClientConnection implements Co client.reqWritedCounter.increment(); } if (writePending.compareAndSet(false, true)) { - continueWrite(true); + sendWrite(true); } } diff --git a/src/main/java/org/redkale/net/client/ClientResult.java b/src/main/java/org/redkale/net/client/ClientResponse.java similarity index 59% rename from src/main/java/org/redkale/net/client/ClientResult.java rename to src/main/java/org/redkale/net/client/ClientResponse.java index 97e52862d..372e8ef23 100644 --- a/src/main/java/org/redkale/net/client/ClientResult.java +++ b/src/main/java/org/redkale/net/client/ClientResponse.java @@ -8,28 +8,28 @@ package org.redkale.net.client; /** * * @author zhangjx - * @param

result + * @param

message */ -public class ClientResult

{ +public class ClientResponse

{ - protected P result; + protected P message; protected Throwable exc; - public ClientResult(P result) { - this.result = result; + public ClientResponse(P result) { + this.message = result; } - public ClientResult(Throwable exc) { + public ClientResponse(Throwable exc) { this.exc = exc; } - public P getResult() { - return result; + public P getMessage() { + return message; } - public void setResult(P result) { - this.result = result; + public void setMessage(P message) { + this.message = message; } public Throwable getExc() { @@ -45,6 +45,6 @@ public class ClientResult

{ if (exc != null) { return "{\"exc\":" + exc + "}"; } - return "{\"result\":" + result + "}"; + return "{\"message\":" + message + "}"; } }