diff --git a/src/org/redkale/cluster/ClusterAgent.java b/src/org/redkale/cluster/ClusterAgent.java index e95a31f14..60438f68d 100644 --- a/src/org/redkale/cluster/ClusterAgent.java +++ b/src/org/redkale/cluster/ClusterAgent.java @@ -14,6 +14,7 @@ import javax.annotation.Resource; import org.redkale.boot.*; import static org.redkale.boot.Application.*; import org.redkale.convert.json.JsonConvert; +import org.redkale.mq.MessageMultiConsumer; import org.redkale.net.*; import org.redkale.net.http.*; import org.redkale.net.sncp.*; @@ -109,6 +110,14 @@ public abstract class ClusterAgent { register(ns, protocol, service); ClusterEntry entry = new ClusterEntry(ns, protocol, service); localEntrys.put(entry.serviceid, entry); + if (protocol.toLowerCase().startsWith("http")) { + MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); + if (mmc != null) { + register(ns, "mqtp", service); + ClusterEntry mqentry = new ClusterEntry(ns, "mqtp", service); + localEntrys.put(entry.serviceid, mqentry); + } + } } //远程模式加载IP列表, 只支持SNCP协议 if (ns.isSNCP()) { @@ -157,6 +166,9 @@ public abstract class ClusterAgent { return 10; } + //获取MQTP的HTTP远程服务的可用ip列表, key = servicename + public abstract CompletableFuture>> queryMqtpAddress(String protocol, String module, String resname); + //获取HTTP远程服务的可用ip列表 public abstract CompletableFuture> queryHttpAddress(String protocol, String module, String resname); @@ -173,7 +185,7 @@ public abstract class ClusterAgent { protected void updateSncpTransport(ClusterEntry entry) { Service service = entry.serviceref.get(); if (service == null) return; - Collection addrs = queryAddress(entry).join(); + Collection addrs = ClusterAgent.this.queryAddress(entry).join(); Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), entry.netprotocol, entry.address, null, addrs); } @@ -205,6 +217,12 @@ public abstract class ClusterAgent { String module = Rest.getRestModule(service).toLowerCase(); return protocol.toLowerCase() + ":" + module + (resname.isEmpty() ? "" : ("-" + resname)); } + if ("mqtp".equalsIgnoreCase(protocol)) { + MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); + String resname = Sncp.getResourceName(service); + String selfmodule = Rest.getRestModule(service).toLowerCase(); + return protocol.toLowerCase() + ":" + mmc.module() + ":" + selfmodule + (resname.isEmpty() ? "" : ("-" + resname)); + } if (!Sncp.isSncpDyn(service)) return protocol.toLowerCase() + ":" + service.getClass().getName(); String resname = Sncp.getResourceName(service); return protocol.toLowerCase() + ":" + Sncp.getResourceType(service).getName() + (resname.isEmpty() ? "" : ("-" + resname)); diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java index a203e8868..86d67241f 100644 --- a/src/org/redkale/mq/HttpMessageClient.java +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -100,6 +100,54 @@ public class HttpMessageClient extends MessageClient { produceMessage(topic, convertType, userid, groupid, request, null); } + public final void broadcastMessage(HttpSimpleRequest request) { + broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); + } + + public final void broadcastMessage(HttpSimpleRequest request, AtomicLong counter) { + broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); + } + + public final void broadcastMessage(int userid, HttpSimpleRequest request) { + broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null); + } + + public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request) { + broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); + } + + public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); + } + + public final void broadcastMessage(String topic, HttpSimpleRequest request) { + broadcastMessage(topic, ConvertType.JSON, 0, null, request, null); + } + + public final void broadcastMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { + broadcastMessage(topic, ConvertType.JSON, 0, null, request, counter); + } + + public final void broadcastMessage(String topic, ConvertType convertType, HttpSimpleRequest request) { + broadcastMessage(topic, convertType, 0, null, request, null); + } + + public final void broadcastMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) { + broadcastMessage(topic, convertType, 0, null, request, counter); + } + + public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { + broadcastMessage(topic, ConvertType.JSON, userid, groupid, request, null); + } + + public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + broadcastMessage(topic, ConvertType.JSON, userid, groupid, request, counter); + } + + public final void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) { + broadcastMessage(topic, convertType, userid, groupid, request, null); + } + public final CompletableFuture sendMessage(HttpSimpleRequest request, Type type) { return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null).thenApply((HttpResult httbs) -> { if (httbs == null || httbs.getResult() == null) return null; @@ -168,6 +216,12 @@ public class HttpMessageClient extends MessageClient { return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } + public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + message.userid(userid).groupid(groupid); + sendMessage(message, false, counter); + } + public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); diff --git a/src/org/redkale/mq/HttpMessageClusterClient.java b/src/org/redkale/mq/HttpMessageClusterClient.java index cad29cced..02033828b 100644 --- a/src/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/org/redkale/mq/HttpMessageClusterClient.java @@ -44,21 +44,26 @@ public class HttpMessageClusterClient extends HttpMessageClient { @Override public CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - return httpAsync(userid, request); + return httpAsync("http", userid, request); } @Override public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - httpAsync(userid, request); + httpAsync("http", userid, request); } - protected CompletableFuture> httpAsync(int userid, HttpSimpleRequest req) { + @Override + public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + httpAsync("mqtp", userid, request); + } + + private CompletableFuture> httpAsync(String protocol, int userid, HttpSimpleRequest req) { String module = req.getRequestURI(); module = module.substring(1); //去掉/ module = module.substring(0, module.indexOf('/')); Map headers = req.getHeaders(); String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); - return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { + return clusterAgent.queryHttpAddress(protocol, module, resname).thenCompose(addrs -> { if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000)); if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");