This commit is contained in:
Redkale
2020-06-03 17:53:55 +08:00
parent e4df07a00c
commit 982fc8440c
5 changed files with 50 additions and 17 deletions

View File

@@ -784,8 +784,8 @@ public final class Application {
for (MessageAgent agent : this.messageAgents) {
agent.start(sb).join();
}
if (sb.length() > 0) logger.info(sb.toString());
logger.info(this.getClass().getSimpleName() + " MessageAgent init in " + (System.currentTimeMillis() - s) + " ms\r\n");
if (sb.length() > 0) logger.info(sb.toString().trim());
logger.info(this.getClass().getSimpleName() + " MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
}
//if (!singletonrun) signalHandle();
//if (!singletonrun) clearPersistData();

View File

@@ -68,7 +68,7 @@ public abstract class NodeServer {
private InetSocketAddress sncpAddress;
//加载Service时的处理函数
protected Consumer<Service> consumer;
protected BiConsumer<MessageAgent, Service> consumer;
//server节点的配置
protected AnyValue serverConf;
@@ -90,6 +90,9 @@ public abstract class NodeServer {
//MessageAgent对象集合
protected final Map<String, MessageAgent> messageAgents = new HashMap<>();
//需要远程模式Service的MessageAgent对象集合
protected final Map<String, MessageAgent> sncpRemoteAgents = new HashMap<>();
private volatile int maxClassNameLength = 0;
private volatile int maxNameLength = 0;
@@ -375,7 +378,7 @@ public abstract class NodeServer {
rf.inject(nodeService); //动态加载的Service也存在按需加载的注入资源
localServices.add(nodeService);
interceptorServices.add(nodeService);
if (consumer != null) consumer.accept(nodeService);
if (consumer != null) consumer.accept(null, nodeService);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "WebSocketNode inject error", e);
@@ -441,13 +444,19 @@ public abstract class NodeServer {
} else if (isSNCP() && !entry.isAutoload()) {
throw new RuntimeException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + resourceName + ", group:" + groups + ") is repeat.");
}
MessageAgent agent = null;
if (entry.getProperty() != null && entry.getProperty().getValue("mq") != null) {
agent = application.getMessageAgent(entry.getProperty().getValue("mq"));
if (agent != null) messageAgents.put(agent.getName(), agent);
}
if (Sncp.isRemote(service)) {
remoteServices.add(service);
if (agent != null) sncpRemoteAgents.put(agent.getName(), agent);
} else {
if (field != null) rf.inject(service); //动态加载的Service也存在按需加载的注入资源
localServices.add(service);
interceptorServices.add(service);
if (consumer != null) consumer.accept(service);
if (consumer != null) consumer.accept(agent, service);
}
} catch (RuntimeException ex) {
throw ex;
@@ -483,6 +492,12 @@ public abstract class NodeServer {
sb.append(localThreadName).append(Sncp.toSimpleString(y, maxNameLength, maxClassNameLength)).append(" load and inject").append(LINE_SEPARATOR);
});
}
if (isSNCP() && !sncpRemoteAgents.isEmpty()) {
sncpRemoteAgents.values().forEach(agent -> {
agent.putSncpResp((NodeSncpServer) this);
agent.startSncpRespConsumer();
});
}
//----------------- init -----------------
List<Service> swlist = new ArrayList<>(localServices);
swlist.sort((o1, o2) -> {
@@ -587,14 +602,16 @@ public abstract class NodeServer {
for (AnyValue list : proplist) {
AnyValue.DefaultAnyValue prop = null;
String sc = list.getValue("groups");
String mq = list.getValue("mq");
if (sc != null) {
sc = sc.trim();
if (sc.endsWith(";")) sc = sc.substring(0, sc.length() - 1);
}
if (sc == null) sc = localGroup;
if (sc != null) {
if (sc != null || mq != null) {
prop = new AnyValue.DefaultAnyValue();
prop.addValue("groups", sc);
if (sc != null) prop.addValue("groups", sc);
if (mq != null) prop.addValue("mq", mq);
}
ClassFilter filter = new ClassFilter(this.serverClassLoader, ref, inter, excludeSuperClasses, prop);
for (AnyValue av : list.getAnyValues(property)) { // <service>、<filter>、<servlet> 节点

View File

@@ -10,6 +10,7 @@ import java.net.*;
import java.util.*;
import java.util.logging.Level;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.mq.MessageAgent;
import org.redkale.net.*;
import org.redkale.net.sncp.*;
import org.redkale.service.Service;
@@ -32,7 +33,10 @@ public class NodeSncpServer extends NodeServer {
private NodeSncpServer(Application application, AnyValue serconf) {
super(application, createServer(application, serconf));
this.sncpServer = (SncpServer) this.server;
this.consumer = sncpServer == null || application.singletonrun ? null : x -> sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet
this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> {
SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet
if (agent != null) agent.putService(this, x, servlet);
};
}
public static NodeServer createNodeServer(Application application, AnyValue serconf) {
@@ -48,8 +52,8 @@ public class NodeSncpServer extends NodeServer {
return sncpServer == null ? null : sncpServer.getSocketAddress();
}
public void consumerAccept(Service service) {
if (this.consumer != null) this.consumer.accept(service);
public void consumerAccept(MessageAgent agent, Service service) {
if (this.consumer != null) this.consumer.accept(agent, service);
}
@Override

View File

@@ -45,22 +45,32 @@ public abstract class MessageAgent {
protected SncpMessageProcessor sncpRespProcessor;
//sncpRespConsumer启动耗时 小于0表示未启动
protected long sncpRespStartms = -1;
//本地Service消息接收处理器 key:topic
protected ConcurrentHashMap<String, MessageNode> messageNodes = new ConcurrentHashMap<>();
protected HashMap<String, MessageNode> messageNodes = new LinkedHashMap<>();
public void init(AnyValue config) {
}
public final synchronized void startSncpRespConsumer() {
if (this.sncpRespStartms >= 0) return;
long s = System.currentTimeMillis();
if (this.sncpRespConsumer != null) {
this.sncpRespConsumer.start();
this.sncpRespConsumer.waitFor();
}
this.sncpRespStartms = System.currentTimeMillis() - s;
}
public CompletableFuture<Void> start(final StringBuffer sb) {
AtomicInteger maxlen = new AtomicInteger(sncpRespConsumer == null ? 0 : sncpRespConsumer.topic.length());
this.messageNodes.values().forEach(node -> {
if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length());
});
if (this.sncpRespConsumer != null) {
long s = System.currentTimeMillis();
this.sncpRespConsumer.start();
this.sncpRespConsumer.waitFor();
sb.append("MessageConsumer(topic=").append(fillString(this.sncpRespConsumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n");
if (this.sncpRespStartms >= 0) {
sb.append("MessageConsumer(topic=").append(fillString(this.sncpRespConsumer.topic, maxlen.get())).append(") init and start in ").append(this.sncpRespStartms).append(" ms\r\n");
}
this.messageNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
@@ -152,6 +162,7 @@ public abstract class MessageAgent {
//格式: sncp.req.user
protected String generateSncpReqTopic(Service service) {
String resname = Sncp.getResourceName(service);
if (service instanceof WebSocketNode) return "sncp.req.wsn" + (resname.isEmpty() ? "" : ("-" + resname));
return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname));
}

View File

@@ -97,10 +97,11 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
return ((SncpPrepareServlet) this.prepare).removeSncpServlet(sncpService);
}
public void addSncpServlet(Service sncpService) {
public SncpDynServlet addSncpServlet(Service sncpService) {
SncpDynServlet sds = new SncpDynServlet(BsonFactory.root().getConvert(), Sncp.getResourceName(sncpService),
Sncp.getResourceType(sncpService), sncpService, maxClassNameLength, maxNameLength);
this.prepare.addServlet(sds, null, Sncp.getConf(sncpService));
return sds;
}
@Override