From 2304e115da470132a3be54345d3425529d67ea5a Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 23 Oct 2023 15:27:21 +0800 Subject: [PATCH] =?UTF-8?q?produceMessage=E5=8A=A0=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/cluster/ClusterRpcClient.java | 2 +- .../redkale/cluster/HttpClusterRpcClient.java | 12 ++++++++---- .../redkale/cluster/HttpLocalRpcClient.java | 17 ++++++++++++----- .../org/redkale/cluster/HttpRpcClient.java | 18 +++++++++--------- .../org/redkale/mq/HttpRpcMessageClient.java | 4 ++-- .../java/org/redkale/mq/MessageClient.java | 4 ++-- 6 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/redkale/cluster/ClusterRpcClient.java b/src/main/java/org/redkale/cluster/ClusterRpcClient.java index f12413d09..e71d12170 100644 --- a/src/main/java/org/redkale/cluster/ClusterRpcClient.java +++ b/src/main/java/org/redkale/cluster/ClusterRpcClient.java @@ -31,6 +31,6 @@ public interface ClusterRpcClient { * * @param message 消息体 */ - public void produceMessage(R message); + public CompletableFuture produceMessage(R message); } diff --git a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java index 535e9042f..3e6de7980 100644 --- a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java @@ -68,11 +68,11 @@ public class HttpClusterRpcClient extends HttpRpcClient { } @Override - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { + public CompletableFuture produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { - localClient.produceMessage(topic, userid, groupid, request); + return localClient.produceMessage(topic, userid, groupid, request); } else { - httpAsync(true, userid, request); + return httpAsync(true, userid, request).thenApply(v -> null); } } @@ -160,7 +160,11 @@ public class HttpClusterRpcClient extends HttpRpcClient { InetSocketAddress addr = it.next(); String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; if (finest) { - logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders); + if (clientBody != null) { + logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", body=" + new String(clientBody, StandardCharsets.UTF_8) + ", headers=" + clientHeaders); + } else { + logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders); + } } if (httpSimpleClient != null) { return httpSimpleClient.postAsync(url, clientHeaders, clientBody); diff --git a/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java b/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java index fd6c99d9c..6442518b5 100644 --- a/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java @@ -104,9 +104,9 @@ public class HttpLocalRpcClient extends HttpRpcClient { if (isEmpty(request.getTraceid())) { request.setTraceid(Traces.currentTraceid()); } + CompletableFuture future = new CompletableFuture(); String topic = generateHttpReqTopic(request, request.getPath()); HttpServlet servlet = findHttpServlet(topic); - CompletableFuture future = new CompletableFuture(); if (servlet == null) { if (logger.isLoggable(Level.FINE)) { logger.log(Level.FINE, "sendMessage: request=" + request + ", not found servlet"); @@ -129,15 +129,16 @@ public class HttpLocalRpcClient extends HttpRpcClient { if (isEmpty(request.getTraceid())) { request.setTraceid(Traces.currentTraceid()); } + CompletableFuture future = new CompletableFuture(); HttpServlet servlet = findHttpServlet(topic); if (servlet == null) { if (logger.isLoggable(Level.FINE)) { logger.log(Level.FINE, "sendMessage: request=" + request + ", not found servlet"); } - return CompletableFuture.completedFuture(new HttpResult().status(404)); + future.complete(new HttpResult().status(404)); + return future; } HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); - CompletableFuture future = new CompletableFuture(); HttpResponse resp = new HttpMessageLocalResponse(req, future); try { servlet.execute(req, resp); @@ -161,14 +162,16 @@ public class HttpLocalRpcClient extends HttpRpcClient { } @Override - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { + public CompletableFuture produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { + CompletableFuture future = new CompletableFuture(); HttpDispatcherServlet ps = dispatcherServlet(); HttpServlet servlet = ps.findServletByTopic(topic); if (servlet == null) { if (logger.isLoggable(Level.FINE)) { logger.log(Level.FINE, "produceMessage: request=" + request + ", not found servlet"); } - return; + future.completeExceptionally(new RuntimeException("404 Not Found " + topic)); + return future; } HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpResponse resp = new HttpMessageLocalResponse(req, null); @@ -177,6 +180,10 @@ public class HttpLocalRpcClient extends HttpRpcClient { } catch (Exception e) { throw new RedkaleException(e); } + return future.thenApply(rs -> { + Traces.currentTraceid(request.getTraceid()); + return null; + }); } public static class HttpMessageLocalRequest extends HttpRequest { diff --git a/src/main/java/org/redkale/cluster/HttpRpcClient.java b/src/main/java/org/redkale/cluster/HttpRpcClient.java index 017ce7038..ae851c031 100644 --- a/src/main/java/org/redkale/cluster/HttpRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpRpcClient.java @@ -26,20 +26,20 @@ import org.redkale.util.RedkaleException; public abstract class HttpRpcClient implements ClusterRpcClient> { @Override - public final void produceMessage(HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), 0, null, request); + public final CompletableFuture produceMessage(HttpSimpleRequest request) { + return produceMessage(generateHttpReqTopic(request, null), 0, null, request); } - public final void produceMessage(Serializable userid, HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), userid, null, request); + public final CompletableFuture produceMessage(Serializable userid, HttpSimpleRequest request) { + return produceMessage(generateHttpReqTopic(request, null), userid, null, request); } - public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request) { - produceMessage(generateHttpReqTopic(request, null), userid, groupid, request); + public final CompletableFuture produceMessage(Serializable userid, String groupid, HttpSimpleRequest request) { + return produceMessage(generateHttpReqTopic(request, null), userid, groupid, request); } - public final void produceMessage(String topic, HttpSimpleRequest request) { - produceMessage(topic, 0, null, request); + public final CompletableFuture produceMessage(String topic, HttpSimpleRequest request) { + return produceMessage(topic, 0, null, request); } @Override @@ -119,7 +119,7 @@ public abstract class HttpRpcClient implements ClusterRpcClient> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request); - public abstract void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request); + public abstract CompletableFuture produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request); protected abstract int getNodeid(); diff --git a/src/main/java/org/redkale/mq/HttpRpcMessageClient.java b/src/main/java/org/redkale/mq/HttpRpcMessageClient.java index 483969132..589a3f631 100644 --- a/src/main/java/org/redkale/mq/HttpRpcMessageClient.java +++ b/src/main/java/org/redkale/mq/HttpRpcMessageClient.java @@ -35,10 +35,10 @@ final class HttpRpcMessageClient extends HttpRpcClient { } @Override - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { + public CompletableFuture produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { MessageRecord message = messageClient.createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), requestCoder.encode(request)); message.userid(userid).groupid(groupid); - messageClient.produceMessage(message); + return messageClient.produceMessage(message); } @Override diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index ff6ea2510..eb057e266 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -79,8 +79,8 @@ public class MessageClient implements ClusterRpcClient produceMessage(MessageRecord message) { + return messageAgent.getMessageClientProducer().apply(message); } @Override