This commit is contained in:
Redkale
2020-06-02 09:56:01 +08:00
parent db8c94f433
commit 9cd9a8d3ea
4 changed files with 35 additions and 14 deletions

View File

@@ -393,6 +393,13 @@ public final class Application {
cluster.init(cluster.getConfig());
}
this.clusterAgent = cluster;
if (mqs != null) {
for (MessageAgent agent : mqs) {
this.resourceFactory.inject(agent);
agent.init(agent.getConfig());
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
}
}
this.messageAgents = mqs;
Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);

View File

@@ -6,10 +6,12 @@
package org.redkale.mq;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.Logger;
import javax.annotation.Resource;
import org.redkale.boot.*;
import static org.redkale.boot.Application.RESNAME_APP_NODEID;
import org.redkale.net.http.*;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.Service;
@@ -29,6 +31,9 @@ public abstract class MessageAgent {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@Resource(name = RESNAME_APP_NODEID)
protected int nodeid;
protected String name;
protected AnyValue config;
@@ -36,7 +41,7 @@ public abstract class MessageAgent {
protected MessageProducer producer;
//本地Service消息接收处理器 key:topic
protected Map<String, Service> localConsumers;
protected ConcurrentHashMap<String, Service> localConsumers;
public void init(AnyValue config) {
}
@@ -100,35 +105,40 @@ public abstract class MessageAgent {
//创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, Consumer<MessageRecord> processor);
public final void putHttpService(NodeHttpServer ns, Service service) {
}
//格式: sncp:req:user
protected static String generateSncpReqTopic(Service service) {
protected String generateSncpReqTopic(Service service) {
String resname = Sncp.getResourceName(service);
return "sncp:req:" + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: sncp:resp:node10
protected static String generateSncpRespTopic(Application application) {
return "sncp:resp:node" + application.getNodeid();
protected String generateSncpRespTopic() {
return "sncp:resp:node" + nodeid;
}
//格式: http:req:user
protected static String generateHttpReqTopic(Service service) {
protected String generateHttpReqTopic(Service service) {
String resname = Sncp.getResourceName(service);
return "http:req:" + Rest.getWebModuleName(service.getClass()).toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: http:resp:node10
protected static String generateHttpRespTopic(Application application) {
return "http:resp:node" + application.getNodeid();
protected String generateHttpRespTopic() {
return "http:resp:node" + nodeid;
}
//格式: ws:resp:node10
protected static String generateWebSocketRespTopic(Application application) {
return "ws:resp:node" + application.getNodeid();
//格式: ws:resp:wsgame
protected String generateWebSocketRespTopic(WebSocketNode node) {
return "ws:resp:" + node.getName();
}
//格式: xxxx:resp:node10
protected static String generateRespTopic(String protocol, Application application) {
return protocol + ":resp:node" + application.getNodeid();
protected String generateRespTopic(String protocol) {
return protocol + ":resp:node" + nodeid;
}
}

View File

@@ -23,7 +23,7 @@ public abstract class MessageProducer extends Thread {
protected volatile boolean closed;
public abstract CompletableFuture apply(MessageRecord message);
public abstract CompletableFuture<MessageRecord> apply(MessageRecord message);
protected abstract void waitFor();

View File

@@ -16,6 +16,7 @@ import javax.annotation.*;
import org.redkale.boot.*;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.MessageAgent;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
@@ -56,6 +57,9 @@ public abstract class WebSocketNode {
@Resource(name = "$")
protected CacheSource<InetSocketAddress> sncpNodeAddresses;
@Resource(name = "$")
protected MessageAgent messageAgent;
//当前节点的本地WebSocketEngine
protected WebSocketEngine localEngine;