diff --git a/src/org/redkale/cluster/ClusterAgent.java b/src/org/redkale/cluster/ClusterAgent.java index b0e31923b..afdd8ffc5 100644 --- a/src/org/redkale/cluster/ClusterAgent.java +++ b/src/org/redkale/cluster/ClusterAgent.java @@ -219,9 +219,8 @@ public abstract class ClusterAgent { } if ("mqtp".equalsIgnoreCase(protocol)) { MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); - String resname = Sncp.getResourceName(service); String selfmodule = Rest.getRestModule(service).toLowerCase(); - return protocol.toLowerCase() + ":" + mmc.module() + ":" + selfmodule + (resname.isEmpty() ? "" : ("-" + resname)); + return protocol.toLowerCase() + ":" + mmc.module() + ":" + selfmodule; } if (!Sncp.isSncpDyn(service)) return protocol.toLowerCase() + ":" + service.getClass().getName(); String resname = Sncp.getResourceName(service); diff --git a/src/org/redkale/mq/HttpMessageClusterClient.java b/src/org/redkale/mq/HttpMessageClusterClient.java index 02033828b..60569efd7 100644 --- a/src/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/org/redkale/mq/HttpMessageClusterClient.java @@ -44,26 +44,58 @@ public class HttpMessageClusterClient extends HttpMessageClient { @Override public CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - return httpAsync("http", userid, request); + return httpAsync(userid, request); } @Override public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - httpAsync("http", userid, request); + httpAsync(userid, request); } @Override public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - httpAsync("mqtp", userid, request); + mqtpAsync(userid, request); } - private CompletableFuture> httpAsync(String protocol, int userid, HttpSimpleRequest req) { + private CompletableFuture> mqtpAsync(int userid, HttpSimpleRequest req) { String module = req.getRequestURI(); module = module.substring(1); //去掉/ module = module.substring(0, module.indexOf('/')); Map headers = req.getHeaders(); String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); - return clusterAgent.queryHttpAddress(protocol, module, resname).thenCompose(addrs -> { + return clusterAgent.queryMqtpAddress("mqtp", module, resname).thenCompose(addrmap -> { + if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture(); + java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000)); + if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); + if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); + if (headers != null) headers.forEach((n, v) -> { + if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); + }); + builder.header("Content-Type", "x-www-form-urlencoded"); + String paramstr = req.getParametersToString(); + if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr)); + List futures = new ArrayList<>(); + for (Map.Entry> en : addrmap.entrySet()) { + String realmodule = en.getKey(); + Collection addrs = en.getValue(); + if (addrs == null || addrs.isEmpty()) continue; + String suburi = req.getRequestURI(); + suburi = suburi.substring(1); //跳过 / + suburi = "/" + realmodule + suburi.substring(suburi.indexOf('/')); + futures.add(forEachCollectionFuture(userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + suburi, builder, addrs.iterator())); + } + if (futures.isEmpty()) return CompletableFuture.completedFuture(null); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> null); + }); + } + + private CompletableFuture> httpAsync(int userid, HttpSimpleRequest req) { + String module = req.getRequestURI(); + module = module.substring(1); //去掉/ + module = module.substring(0, module.indexOf('/')); + Map headers = req.getHeaders(); + String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); + return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000)); if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); @@ -74,17 +106,16 @@ public class HttpMessageClusterClient extends HttpMessageClient { builder.header("Content-Type", "x-www-form-urlencoded"); String paramstr = req.getParametersToString(); if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr)); - return forEachCollectionFuture(userid, req, builder, addrs.iterator()); + return forEachCollectionFuture(userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), builder, addrs.iterator()); }); } - private CompletableFuture> forEachCollectionFuture(int userid, HttpSimpleRequest req, java.net.http.HttpRequest.Builder builder, Iterator it) { + private CompletableFuture> forEachCollectionFuture(int userid, HttpSimpleRequest req, String requesturi, java.net.http.HttpRequest.Builder builder, Iterator it) { if (!it.hasNext()) return CompletableFuture.completedFuture(null); InetSocketAddress addr = it.next(); - String url = "http://" + addr.getHostString() + ":" + addr.getPort() + (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(); - builder.uri(URI.create(url)); - return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> { - if (resp.statusCode() != 200) return forEachCollectionFuture(userid, req, builder, it); + String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; + return httpClient.sendAsync(builder.copy().uri(URI.create(url)).build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> { + if (resp.statusCode() != 200) return forEachCollectionFuture(userid, req, requesturi, builder, it); HttpResult rs = new HttpResult(); java.net.http.HttpHeaders hs = resp.headers(); if (hs != null) {