diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 65411727c..d292dab66 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -393,6 +393,13 @@ public final class Application { cluster.init(cluster.getConfig()); } this.clusterAgent = cluster; + if (mqs != null) { + for (MessageAgent agent : mqs) { + this.resourceFactory.inject(agent); + agent.init(agent.getConfig()); + this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); + } + } this.messageAgents = mqs; Thread.currentThread().setContextClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader); diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 299ea0fd6..ecb649d1c 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -6,10 +6,12 @@ package org.redkale.mq; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; import java.util.function.*; import java.util.logging.Logger; +import javax.annotation.Resource; import org.redkale.boot.*; +import static org.redkale.boot.Application.RESNAME_APP_NODEID; import org.redkale.net.http.*; import org.redkale.net.sncp.Sncp; import org.redkale.service.Service; @@ -29,6 +31,9 @@ public abstract class MessageAgent { protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + @Resource(name = RESNAME_APP_NODEID) + protected int nodeid; + protected String name; protected AnyValue config; @@ -36,7 +41,7 @@ public abstract class MessageAgent { protected MessageProducer producer; //本地Service消息接收处理器, key:topic - protected Map localConsumers; + protected ConcurrentHashMap localConsumers; public void init(AnyValue config) { } @@ -100,35 +105,40 @@ public abstract class MessageAgent { //创建指定topic的消费处理器 public abstract MessageConsumer createConsumer(String topic, Consumer processor); + public final void putHttpService(NodeHttpServer ns, Service service) { + + } + //格式: sncp:req:user - protected static String generateSncpReqTopic(Service service) { + protected String generateSncpReqTopic(Service service) { String resname = Sncp.getResourceName(service); return "sncp:req:" + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); } //格式: sncp:resp:node10 - protected static String generateSncpRespTopic(Application application) { - return "sncp:resp:node" + application.getNodeid(); + protected String generateSncpRespTopic() { + return "sncp:resp:node" + nodeid; } //格式: http:req:user - protected static String generateHttpReqTopic(Service service) { + protected String generateHttpReqTopic(Service service) { String resname = Sncp.getResourceName(service); return "http:req:" + Rest.getWebModuleName(service.getClass()).toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); } //格式: http:resp:node10 - protected static String generateHttpRespTopic(Application application) { - return "http:resp:node" + application.getNodeid(); + protected String generateHttpRespTopic() { + return "http:resp:node" + nodeid; } - //格式: ws:resp:node10 - protected static String generateWebSocketRespTopic(Application application) { - return "ws:resp:node" + application.getNodeid(); + //格式: ws:resp:wsgame + protected String generateWebSocketRespTopic(WebSocketNode node) { + return "ws:resp:" + node.getName(); } //格式: xxxx:resp:node10 - protected static String generateRespTopic(String protocol, Application application) { - return protocol + ":resp:node" + application.getNodeid(); + protected String generateRespTopic(String protocol) { + return protocol + ":resp:node" + nodeid; } + } diff --git a/src/org/redkale/mq/MessageProducer.java b/src/org/redkale/mq/MessageProducer.java index dbfb780d1..0bedec17a 100644 --- a/src/org/redkale/mq/MessageProducer.java +++ b/src/org/redkale/mq/MessageProducer.java @@ -23,7 +23,7 @@ public abstract class MessageProducer extends Thread { protected volatile boolean closed; - public abstract CompletableFuture apply(MessageRecord message); + public abstract CompletableFuture apply(MessageRecord message); protected abstract void waitFor(); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 7d5ee607d..9578e370d 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -16,6 +16,7 @@ import javax.annotation.*; import org.redkale.boot.*; import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; +import org.redkale.mq.MessageAgent; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -56,6 +57,9 @@ public abstract class WebSocketNode { @Resource(name = "$") protected CacheSource sncpNodeAddresses; + @Resource(name = "$") + protected MessageAgent messageAgent; + //当前节点的本地WebSocketEngine protected WebSocketEngine localEngine;