This commit is contained in:
@@ -225,7 +225,7 @@ public class NodeHttpServer extends NodeServer {
|
||||
if (!prefix0.isEmpty() && prefix0.charAt(0) != '/') prefix0 = '/' + prefix0;
|
||||
final String prefix = prefix0;
|
||||
|
||||
final String threadName = "[" + Thread.currentThread().getName() + "] ";
|
||||
final String localThreadName = "[" + Thread.currentThread().getName() + "] ";
|
||||
final List<AbstractMap.SimpleEntry<String, String[]>> ss = sb == null ? null : new ArrayList<>();
|
||||
String mqname = restConf.getValue("mq");
|
||||
MessageAgent agent0 = null;
|
||||
@@ -276,8 +276,8 @@ public class NodeHttpServer extends NodeServer {
|
||||
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
|
||||
if (ws != null && !ws.repair()) prefix2 = "";
|
||||
resourceFactory.inject(servlet, NodeHttpServer.this);
|
||||
if (agent != null) agent.putHttpService(this, service, servlet);
|
||||
//if (finest) logger.finest(threadName + " Create RestServlet(resource.name='" + name + "') = " + servlet);
|
||||
if (agent != null) agent.putService(this, service, servlet);
|
||||
//if (finest) logger.finest(localThreadName + " Create RestServlet(resource.name='" + name + "') = " + servlet);
|
||||
if (ss != null) {
|
||||
String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value();
|
||||
for (int i = 0; i < mappings.length; i++) {
|
||||
@@ -336,7 +336,7 @@ public class NodeHttpServer extends NodeServer {
|
||||
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
|
||||
if (ws != null && !ws.repair()) prefix2 = "";
|
||||
resourceFactory.inject(servlet, NodeHttpServer.this);
|
||||
if (finest) logger.finest(threadName + " " + stype.getName() + " create a RestWebSocketServlet");
|
||||
if (finest) logger.finest(localThreadName + " " + stype.getName() + " create a RestWebSocketServlet");
|
||||
if (ss != null) {
|
||||
String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value();
|
||||
for (int i = 0; i < mappings.length; i++) {
|
||||
@@ -355,13 +355,13 @@ public class NodeHttpServer extends NodeServer {
|
||||
if (as.getKey().length() > max) max = as.getKey().length();
|
||||
}
|
||||
for (AbstractMap.SimpleEntry<String, String[]> as : ss) {
|
||||
sb.append(threadName).append(" Load ").append(as.getKey());
|
||||
sb.append(localThreadName).append(" Load ").append(as.getKey());
|
||||
for (int i = 0; i < max - as.getKey().length(); i++) {
|
||||
sb.append(' ');
|
||||
}
|
||||
sb.append(" mapping to ").append(Arrays.toString(as.getValue())).append(LINE_SEPARATOR);
|
||||
}
|
||||
sb.append(threadName).append(" All HttpServlets load cost ").append(System.currentTimeMillis() - starts).append(" ms").append(LINE_SEPARATOR);
|
||||
sb.append(localThreadName).append(" All HttpServlets load cost ").append(System.currentTimeMillis() - starts).append(" ms").append(LINE_SEPARATOR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,8 +12,9 @@ import java.util.logging.Logger;
|
||||
import javax.annotation.Resource;
|
||||
import org.redkale.boot.*;
|
||||
import static org.redkale.boot.Application.RESNAME_APP_NODEID;
|
||||
import org.redkale.net.Servlet;
|
||||
import org.redkale.net.http.*;
|
||||
import org.redkale.net.sncp.Sncp;
|
||||
import org.redkale.net.sncp.*;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.*;
|
||||
|
||||
@@ -45,14 +46,14 @@ public abstract class MessageAgent {
|
||||
protected SncpMessageProcessor sncpRespProcessor;
|
||||
|
||||
//本地Service消息接收处理器, key:topic
|
||||
protected ConcurrentHashMap<String, HttpMessageNode> httpNodes = new ConcurrentHashMap<>();
|
||||
protected ConcurrentHashMap<String, MessageNode> messageNodes = new ConcurrentHashMap<>();
|
||||
|
||||
public void init(AnyValue config) {
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> start(final StringBuffer sb) {
|
||||
AtomicInteger maxlen = new AtomicInteger(sncpRespConsumer == null ? 0 : sncpRespConsumer.topic.length());
|
||||
this.httpNodes.values().forEach(node -> {
|
||||
this.messageNodes.values().forEach(node -> {
|
||||
if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length());
|
||||
});
|
||||
if (this.sncpRespConsumer != null) {
|
||||
@@ -61,7 +62,7 @@ public abstract class MessageAgent {
|
||||
this.sncpRespConsumer.waitFor();
|
||||
sb.append("MessageConsumer(topic=").append(fillString(this.sncpRespConsumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n");
|
||||
}
|
||||
this.httpNodes.values().forEach(node -> {
|
||||
this.messageNodes.values().forEach(node -> {
|
||||
long s = System.currentTimeMillis();
|
||||
node.consumer.start();
|
||||
node.consumer.waitFor();
|
||||
@@ -71,7 +72,7 @@ public abstract class MessageAgent {
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> stop() {
|
||||
this.httpNodes.values().forEach(node -> {
|
||||
this.messageNodes.values().forEach(node -> {
|
||||
node.consumer.close();
|
||||
});
|
||||
return CompletableFuture.completedFuture(null);
|
||||
@@ -134,11 +135,18 @@ public abstract class MessageAgent {
|
||||
this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor);
|
||||
}
|
||||
|
||||
public final synchronized void putHttpService(NodeHttpServer ns, Service service, HttpServlet servlet) {
|
||||
public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
|
||||
String topic = generateHttpReqTopic(service);
|
||||
if (httpNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
|
||||
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
|
||||
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.producer, ns, service, servlet);
|
||||
this.httpNodes.put(topic, new HttpMessageNode(ns, service, servlet, processor, createConsumer(topic, processor)));
|
||||
this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor)));
|
||||
}
|
||||
|
||||
public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) {
|
||||
String topic = generateSncpReqTopic(service);
|
||||
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
|
||||
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this);
|
||||
this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor)));
|
||||
}
|
||||
|
||||
//格式: sncp.req.user
|
||||
@@ -187,19 +195,19 @@ public abstract class MessageAgent {
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
protected static class HttpMessageNode {
|
||||
protected static class MessageNode {
|
||||
|
||||
public final NodeHttpServer server;
|
||||
public final NodeServer server;
|
||||
|
||||
public final Service service;
|
||||
|
||||
public final HttpServlet servlet;
|
||||
public final Servlet servlet;
|
||||
|
||||
public final HttpMessageProcessor processor;
|
||||
public final MessageProcessor processor;
|
||||
|
||||
public final MessageConsumer consumer;
|
||||
|
||||
public HttpMessageNode(NodeHttpServer server, Service service, HttpServlet servlet, HttpMessageProcessor processor, MessageConsumer consumer) {
|
||||
public MessageNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageConsumer consumer) {
|
||||
this.server = server;
|
||||
this.service = service;
|
||||
this.servlet = servlet;
|
||||
|
||||
Reference in New Issue
Block a user