diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 92909aa0a..5e0c42395 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -303,7 +303,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture newConnection() { - return connect(true); + return connect(false); } private CompletableFuture connect(final boolean pool) { diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 15dbd06ff..2fc68e45f 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -27,14 +27,16 @@ import org.redkale.util.*; */ public abstract class ClientCodec implements CompletionHandler { - protected final ClientConnection connection; - private final List> respResults = new ArrayList<>(); private final ByteArray readArray = new ByteArray(); private final ObjectPool> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); + protected final ClientConnection connection; + + protected ClientMessageListener messageListener; + public ClientCodec(ClientConnection connection) { Objects.requireNonNull(connection); this.connection = connection; @@ -42,6 +44,11 @@ public abstract class ClientCodec implements Complet public abstract void decodeMessages(ByteBuffer buffer, ByteArray array); + public ClientCodec withMessageListener(ClientMessageListener listener) { + this.messageListener = listener; + return this; + } + @Override public final void completed(Integer count, ByteBuffer attachment) { AsyncConnection channel = connection.channel; @@ -69,8 +76,11 @@ public abstract class ClientCodec implements Complet for (ClientResponse cr : respResults) { connection.doneResponseCounter.increment(); if (cr.isError()) { - connection.dispose(cr.exc); + connection.dispose(cr.cause); return; + } else if (messageListener != null) { + messageListener.onMessage(connection, cr); + respPool.accept(cr); } else { ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); if (respFuture != null) { @@ -78,7 +88,7 @@ public abstract class ClientCodec implements Complet connection.dispose(new RedkaleException("request pipeline error")); return; } - responseComplete(false, respFuture, cr.message, cr.exc); + responseComplete(false, respFuture, cr.message, cr.cause); } respPool.accept(cr); } @@ -101,101 +111,99 @@ public abstract class ClientCodec implements Complet } void responseComplete(boolean halfCompleted, ClientFuture respFuture, P message, Throwable exc) { - if (respFuture != null) { - R request = respFuture.request; - AsyncIOThread readThread = connection.channel.getReadIOThread(); - final WorkThread workThread = request.workThread; - try { - if (!halfCompleted && !request.isCompleted()) { - if (exc == null) { - connection.sendHalfWriteInReadThread(request, exc); - //request没有发送完,respFuture需要再次接收 - return; - } else { - connection.sendHalfWriteInReadThread(request, exc); - //异常了需要清掉半包 - } - } - connection.respWaitingCounter.decrement(); - if (connection.isAuthenticated()) { - connection.client.incrRespDoneCounter(); - } - respFuture.cancelTimeout(); -// if (connection.client.debug) { -// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, exc); -// } - connection.preComplete(message, (R) request, exc); - + R request = respFuture.request; + AsyncIOThread readThread = connection.channel.getReadIOThread(); + final WorkThread workThread = request.workThread; + try { + if (!halfCompleted && !request.isCompleted()) { if (exc == null) { - final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); - if (workThread == null) { - readThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.complete(rs); - }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); - respFuture.complete(rs); - } else { - workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.complete(rs); - }); - } - } else { - workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.complete(rs); - }); - } - } else { //异常 - if (workThread == null) { - readThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - } else { - workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - }); - } - } else { - workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - }); - } + connection.sendHalfWriteInReadThread(request, exc); + //request没有发送完,respFuture需要再次接收 + return; + } else { + connection.sendHalfWriteInReadThread(request, exc); + //异常了需要清掉半包 } - } catch (Throwable t) { + } + connection.respWaitingCounter.decrement(); + if (connection.isAuthenticated()) { + connection.client.incrRespDoneCounter(); + } + respFuture.cancelTimeout(); +// if (connection.client.debug) { +// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause); +// } + connection.preComplete(message, (R) request, exc); + + if (exc == null) { + final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); if (workThread == null) { readThread.runWork(() -> { Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(t); + respFuture.complete(rs); }); } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE if (workThread.inIO()) { Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(t); + respFuture.complete(rs); } else { workThread.execute(() -> { Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(t); + respFuture.complete(rs); }); } } else { workThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.complete(rs); + }); + } + } else { //异常 + if (workThread == null) { + readThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + if (workThread.inIO()) { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + } else { + workThread.execute(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } + } else { + workThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } + } + } catch (Throwable t) { + if (workThread == null) { + readThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(t); + }); + } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + if (workThread.inIO()) { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(t); + } else { + workThread.execute(() -> { Traces.computeIfAbsent(request.traceid); respFuture.completeExceptionally(t); }); } - connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); + } else { + workThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(t); + }); } + connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); } } @@ -204,6 +212,10 @@ public abstract class ClientCodec implements Complet connection.dispose(t); } + public ClientMessageListener getMessageListener() { + return messageListener; + } + protected R nextRequest() { return connection.findRequest(null); } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 115e4b34c..8123998cc 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -278,6 +278,10 @@ public abstract class ClientConnection implements Co } else if (connEntry != null) { //index=-1 connEntry.connOpenState.set(false); } + ClientMessageListener listener = getCodec().getMessageListener(); + if (listener != null) { + listener.onClose(this); + } } public void dispose(Throwable exc) { @@ -330,15 +334,6 @@ public abstract class ClientConnection implements Co } } - //只会被ClientCodec在ReadIOThread中调用 - ClientFuture pollRespFuture(Serializable requestid) { - if (requestid == null) { - return respFutureQueue.poll(); - } else { - return respFutureMap.remove(requestid); - } - } - //只会被ClientCodec在ReadIOThread中调用 R findRequest(Serializable requestid) { if (requestid == null) { @@ -353,6 +348,15 @@ public abstract class ClientConnection implements Co } } + //只会被ClientCodec在ReadIOThread中调用 + protected ClientFuture pollRespFuture(Serializable requestid) { + if (requestid == null) { + return respFutureQueue.poll(); + } else { + return respFutureMap.remove(requestid); + } + } + public boolean isAuthenticated() { return authenticated; } diff --git a/src/main/java/org/redkale/net/client/ClientMessageListener.java b/src/main/java/org/redkale/net/client/ClientMessageListener.java new file mode 100644 index 000000000..3f9083757 --- /dev/null +++ b/src/main/java/org/redkale/net/client/ClientMessageListener.java @@ -0,0 +1,22 @@ +/* + * + */ +package org.redkale.net.client; + +/** + * 接收消息事件 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +public abstract class ClientMessageListener { + + public abstract void onMessage(ClientConnection conn, ClientResponse resp); + + public void onClose(ClientConnection conn) { + } +} diff --git a/src/main/java/org/redkale/net/client/ClientResponse.java b/src/main/java/org/redkale/net/client/ClientResponse.java index 53cceeb08..efab94e98 100644 --- a/src/main/java/org/redkale/net/client/ClientResponse.java +++ b/src/main/java/org/redkale/net/client/ClientResponse.java @@ -25,7 +25,7 @@ public class ClientResponse { protected P message; - protected Throwable exc; + protected Throwable cause; public ClientResponse() { } @@ -37,7 +37,7 @@ public class ClientResponse { public ClientResponse(R request, Throwable exc) { this.request = request; - this.exc = exc; + this.cause = exc; } public Serializable getRequestid() { @@ -52,20 +52,20 @@ public class ClientResponse { public ClientResponse set(R request, Throwable exc) { this.request = request; - this.exc = exc; + this.cause = exc; return this; } protected void prepare() { this.request = null; this.message = null; - this.exc = null; + this.cause = null; } protected boolean recycle() { this.request = null; this.message = null; - this.exc = null; + this.cause = null; return true; } @@ -85,18 +85,18 @@ public class ClientResponse { this.message = message; } - public Throwable getExc() { - return exc; + public Throwable getCause() { + return cause; } - public void setExc(Throwable exc) { - this.exc = exc; + public void setCause(Throwable cause) { + this.cause = cause; } @Override public String toString() { - if (exc != null) { - return "{\"exc\":" + exc + "}"; + if (cause != null) { + return "{\"exc\":" + cause + "}"; } return "{\"message\":" + message + "}"; }