增加ClientMessageListener功能

This commit is contained in:
redkale
2023-09-08 13:09:02 +08:00
parent ddd3d33edc
commit 4c3f919238
5 changed files with 137 additions and 99 deletions

View File

@@ -303,7 +303,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final CompletableFuture<C> newConnection() { public final CompletableFuture<C> newConnection() {
return connect(true); return connect(false);
} }
private CompletableFuture<C> connect(final boolean pool) { private CompletableFuture<C> connect(final boolean pool) {

View File

@@ -27,14 +27,16 @@ import org.redkale.util.*;
*/ */
public abstract class ClientCodec<R extends ClientRequest, P> implements CompletionHandler<Integer, ByteBuffer> { public abstract class ClientCodec<R extends ClientRequest, P> implements CompletionHandler<Integer, ByteBuffer> {
protected final ClientConnection<R, P> connection;
private final List<ClientResponse<R, P>> respResults = new ArrayList<>(); private final List<ClientResponse<R, P>> respResults = new ArrayList<>();
private final ByteArray readArray = new ByteArray(); private final ByteArray readArray = new ByteArray();
private final ObjectPool<ClientResponse<R, P>> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); private final ObjectPool<ClientResponse<R, P>> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle);
protected final ClientConnection<R, P> connection;
protected ClientMessageListener messageListener;
public ClientCodec(ClientConnection<R, P> connection) { public ClientCodec(ClientConnection<R, P> connection) {
Objects.requireNonNull(connection); Objects.requireNonNull(connection);
this.connection = connection; this.connection = connection;
@@ -42,6 +44,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
public abstract void decodeMessages(ByteBuffer buffer, ByteArray array); public abstract void decodeMessages(ByteBuffer buffer, ByteArray array);
public ClientCodec<R, P> withMessageListener(ClientMessageListener listener) {
this.messageListener = listener;
return this;
}
@Override @Override
public final void completed(Integer count, ByteBuffer attachment) { public final void completed(Integer count, ByteBuffer attachment) {
AsyncConnection channel = connection.channel; AsyncConnection channel = connection.channel;
@@ -69,8 +76,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
for (ClientResponse<R, P> cr : respResults) { for (ClientResponse<R, P> cr : respResults) {
connection.doneResponseCounter.increment(); connection.doneResponseCounter.increment();
if (cr.isError()) { if (cr.isError()) {
connection.dispose(cr.exc); connection.dispose(cr.cause);
return; return;
} else if (messageListener != null) {
messageListener.onMessage(connection, cr);
respPool.accept(cr);
} else { } else {
ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid()); ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid());
if (respFuture != null) { if (respFuture != null) {
@@ -78,7 +88,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
connection.dispose(new RedkaleException("request pipeline error")); connection.dispose(new RedkaleException("request pipeline error"));
return; return;
} }
responseComplete(false, respFuture, cr.message, cr.exc); responseComplete(false, respFuture, cr.message, cr.cause);
} }
respPool.accept(cr); respPool.accept(cr);
} }
@@ -101,101 +111,99 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} }
void responseComplete(boolean halfCompleted, ClientFuture<R, P> respFuture, P message, Throwable exc) { void responseComplete(boolean halfCompleted, ClientFuture<R, P> respFuture, P message, Throwable exc) {
if (respFuture != null) { R request = respFuture.request;
R request = respFuture.request; AsyncIOThread readThread = connection.channel.getReadIOThread();
AsyncIOThread readThread = connection.channel.getReadIOThread(); final WorkThread workThread = request.workThread;
final WorkThread workThread = request.workThread; try {
try { if (!halfCompleted && !request.isCompleted()) {
if (!halfCompleted && !request.isCompleted()) {
if (exc == null) {
connection.sendHalfWriteInReadThread(request, exc);
//request没有发送完respFuture需要再次接收
return;
} else {
connection.sendHalfWriteInReadThread(request, exc);
//异常了需要清掉半包
}
}
connection.respWaitingCounter.decrement();
if (connection.isAuthenticated()) {
connection.client.incrRespDoneCounter();
}
respFuture.cancelTimeout();
// if (connection.client.debug) {
// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, exc);
// }
connection.preComplete(message, (R) request, exc);
if (exc == null) { if (exc == null) {
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); connection.sendHalfWriteInReadThread(request, exc);
if (workThread == null) { //request没有发送完respFuture需要再次接收
readThread.runWork(() -> { return;
Traces.computeIfAbsent(request.traceid); } else {
respFuture.complete(rs); connection.sendHalfWriteInReadThread(request, exc);
}); //异常了需要清掉半包
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs);
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs);
});
}
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs);
});
}
} else { //异常
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} }
} catch (Throwable t) { }
connection.respWaitingCounter.decrement();
if (connection.isAuthenticated()) {
connection.client.incrRespDoneCounter();
}
respFuture.cancelTimeout();
// if (connection.client.debug) {
// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause);
// }
connection.preComplete(message, (R) request, exc);
if (exc == null) {
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
if (workThread == null) { if (workThread == null) {
readThread.runWork(() -> { readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t); respFuture.complete(rs);
}); });
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) { if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t); respFuture.complete(rs);
} else { } else {
workThread.execute(() -> { workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t); respFuture.complete(rs);
}); });
} }
} else { } else {
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs);
});
}
} else { //异常
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
}
}
} catch (Throwable t) {
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t);
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid); Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t); respFuture.completeExceptionally(t);
}); });
} }
connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); } else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t);
});
} }
connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
} }
} }
@@ -204,6 +212,10 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
connection.dispose(t); connection.dispose(t);
} }
public ClientMessageListener getMessageListener() {
return messageListener;
}
protected R nextRequest() { protected R nextRequest() {
return connection.findRequest(null); return connection.findRequest(null);
} }

View File

@@ -278,6 +278,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} else if (connEntry != null) { //index=-1 } else if (connEntry != null) { //index=-1
connEntry.connOpenState.set(false); connEntry.connOpenState.set(false);
} }
ClientMessageListener listener = getCodec().getMessageListener();
if (listener != null) {
listener.onClose(this);
}
} }
public void dispose(Throwable exc) { public void dispose(Throwable exc) {
@@ -330,15 +334,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
//只会被ClientCodec在ReadIOThread中调用
ClientFuture<R, P> pollRespFuture(Serializable requestid) {
if (requestid == null) {
return respFutureQueue.poll();
} else {
return respFutureMap.remove(requestid);
}
}
//只会被ClientCodec在ReadIOThread中调用 //只会被ClientCodec在ReadIOThread中调用
R findRequest(Serializable requestid) { R findRequest(Serializable requestid) {
if (requestid == null) { if (requestid == null) {
@@ -353,6 +348,15 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
//只会被ClientCodec在ReadIOThread中调用
protected ClientFuture<R, P> pollRespFuture(Serializable requestid) {
if (requestid == null) {
return respFutureQueue.poll();
} else {
return respFutureMap.remove(requestid);
}
}
public boolean isAuthenticated() { public boolean isAuthenticated() {
return authenticated; return authenticated;
} }

View File

@@ -0,0 +1,22 @@
/*
*
*/
package org.redkale.net.client;
/**
* 接收消息事件
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public abstract class ClientMessageListener {
public abstract void onMessage(ClientConnection conn, ClientResponse resp);
public void onClose(ClientConnection conn) {
}
}

View File

@@ -25,7 +25,7 @@ public class ClientResponse<R extends ClientRequest, P> {
protected P message; protected P message;
protected Throwable exc; protected Throwable cause;
public ClientResponse() { public ClientResponse() {
} }
@@ -37,7 +37,7 @@ public class ClientResponse<R extends ClientRequest, P> {
public ClientResponse(R request, Throwable exc) { public ClientResponse(R request, Throwable exc) {
this.request = request; this.request = request;
this.exc = exc; this.cause = exc;
} }
public Serializable getRequestid() { public Serializable getRequestid() {
@@ -52,20 +52,20 @@ public class ClientResponse<R extends ClientRequest, P> {
public ClientResponse<R, P> set(R request, Throwable exc) { public ClientResponse<R, P> set(R request, Throwable exc) {
this.request = request; this.request = request;
this.exc = exc; this.cause = exc;
return this; return this;
} }
protected void prepare() { protected void prepare() {
this.request = null; this.request = null;
this.message = null; this.message = null;
this.exc = null; this.cause = null;
} }
protected boolean recycle() { protected boolean recycle() {
this.request = null; this.request = null;
this.message = null; this.message = null;
this.exc = null; this.cause = null;
return true; return true;
} }
@@ -85,18 +85,18 @@ public class ClientResponse<R extends ClientRequest, P> {
this.message = message; this.message = message;
} }
public Throwable getExc() { public Throwable getCause() {
return exc; return cause;
} }
public void setExc(Throwable exc) { public void setCause(Throwable cause) {
this.exc = exc; this.cause = cause;
} }
@Override @Override
public String toString() { public String toString() {
if (exc != null) { if (cause != null) {
return "{\"exc\":" + exc + "}"; return "{\"exc\":" + cause + "}";
} }
return "{\"message\":" + message + "}"; return "{\"message\":" + message + "}";
} }