produceMessage加返回值

This commit is contained in:
redkale
2023-10-23 15:27:21 +08:00
parent 073568d6ac
commit 2304e115da
6 changed files with 34 additions and 23 deletions

View File

@@ -31,6 +31,6 @@ public interface ClusterRpcClient<R, P> {
*
* @param message 消息体
*/
public void produceMessage(R message);
public CompletableFuture<Void> produceMessage(R message);
}

View File

@@ -68,11 +68,11 @@ public class HttpClusterRpcClient extends HttpRpcClient {
}
@Override
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
public CompletableFuture<Void> produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
localClient.produceMessage(topic, userid, groupid, request);
return localClient.produceMessage(topic, userid, groupid, request);
} else {
httpAsync(true, userid, request);
return httpAsync(true, userid, request).thenApply(v -> null);
}
}
@@ -160,7 +160,11 @@ public class HttpClusterRpcClient extends HttpRpcClient {
InetSocketAddress addr = it.next();
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi;
if (finest) {
logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders);
if (clientBody != null) {
logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", body=" + new String(clientBody, StandardCharsets.UTF_8) + ", headers=" + clientHeaders);
} else {
logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders);
}
}
if (httpSimpleClient != null) {
return httpSimpleClient.postAsync(url, clientHeaders, clientBody);

View File

@@ -104,9 +104,9 @@ public class HttpLocalRpcClient extends HttpRpcClient {
if (isEmpty(request.getTraceid())) {
request.setTraceid(Traces.currentTraceid());
}
CompletableFuture future = new CompletableFuture();
String topic = generateHttpReqTopic(request, request.getPath());
HttpServlet servlet = findHttpServlet(topic);
CompletableFuture future = new CompletableFuture();
if (servlet == null) {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "sendMessage: request=" + request + ", not found servlet");
@@ -129,15 +129,16 @@ public class HttpLocalRpcClient extends HttpRpcClient {
if (isEmpty(request.getTraceid())) {
request.setTraceid(Traces.currentTraceid());
}
CompletableFuture future = new CompletableFuture();
HttpServlet servlet = findHttpServlet(topic);
if (servlet == null) {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "sendMessage: request=" + request + ", not found servlet");
}
return CompletableFuture.completedFuture(new HttpResult().status(404));
future.complete(new HttpResult().status(404));
return future;
}
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
CompletableFuture future = new CompletableFuture();
HttpResponse resp = new HttpMessageLocalResponse(req, future);
try {
servlet.execute(req, resp);
@@ -161,14 +162,16 @@ public class HttpLocalRpcClient extends HttpRpcClient {
}
@Override
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
public CompletableFuture<Void> produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
CompletableFuture future = new CompletableFuture();
HttpDispatcherServlet ps = dispatcherServlet();
HttpServlet servlet = ps.findServletByTopic(topic);
if (servlet == null) {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "produceMessage: request=" + request + ", not found servlet");
}
return;
future.completeExceptionally(new RuntimeException("404 Not Found " + topic));
return future;
}
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
HttpResponse resp = new HttpMessageLocalResponse(req, null);
@@ -177,6 +180,10 @@ public class HttpLocalRpcClient extends HttpRpcClient {
} catch (Exception e) {
throw new RedkaleException(e);
}
return future.thenApply(rs -> {
Traces.currentTraceid(request.getTraceid());
return null;
});
}
public static class HttpMessageLocalRequest extends HttpRequest {

View File

@@ -26,20 +26,20 @@ import org.redkale.util.RedkaleException;
public abstract class HttpRpcClient implements ClusterRpcClient<HttpSimpleRequest, HttpResult<byte[]>> {
@Override
public final void produceMessage(HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), 0, null, request);
public final CompletableFuture<Void> produceMessage(HttpSimpleRequest request) {
return produceMessage(generateHttpReqTopic(request, null), 0, null, request);
}
public final void produceMessage(Serializable userid, HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), userid, null, request);
public final CompletableFuture<Void> produceMessage(Serializable userid, HttpSimpleRequest request) {
return produceMessage(generateHttpReqTopic(request, null), userid, null, request);
}
public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request);
public final CompletableFuture<Void> produceMessage(Serializable userid, String groupid, HttpSimpleRequest request) {
return produceMessage(generateHttpReqTopic(request, null), userid, groupid, request);
}
public final void produceMessage(String topic, HttpSimpleRequest request) {
produceMessage(topic, 0, null, request);
public final CompletableFuture<Void> produceMessage(String topic, HttpSimpleRequest request) {
return produceMessage(topic, 0, null, request);
}
@Override
@@ -119,7 +119,7 @@ public abstract class HttpRpcClient implements ClusterRpcClient<HttpSimpleReques
public abstract CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request);
public abstract void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request);
public abstract CompletableFuture<Void> produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request);
protected abstract int getNodeid();

View File

@@ -35,10 +35,10 @@ final class HttpRpcMessageClient extends HttpRpcClient {
}
@Override
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
public CompletableFuture<Void> produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
MessageRecord message = messageClient.createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), requestCoder.encode(request));
message.userid(userid).groupid(groupid);
messageClient.produceMessage(message);
return messageClient.produceMessage(message);
}
@Override

View File

@@ -79,8 +79,8 @@ public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRec
}
@Override
public void produceMessage(MessageRecord message) {
messageAgent.getMessageClientProducer().apply(message);
public CompletableFuture<Void> produceMessage(MessageRecord message) {
return messageAgent.getMessageClientProducer().apply(message);
}
@Override