diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index 16940d250..003738758 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -225,7 +225,7 @@ public class NodeHttpServer extends NodeServer { if (!prefix0.isEmpty() && prefix0.charAt(0) != '/') prefix0 = '/' + prefix0; final String prefix = prefix0; - final String threadName = "[" + Thread.currentThread().getName() + "] "; + final String localThreadName = "[" + Thread.currentThread().getName() + "] "; final List> ss = sb == null ? null : new ArrayList<>(); String mqname = restConf.getValue("mq"); MessageAgent agent0 = null; @@ -276,8 +276,8 @@ public class NodeHttpServer extends NodeServer { WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); if (ws != null && !ws.repair()) prefix2 = ""; resourceFactory.inject(servlet, NodeHttpServer.this); - if (agent != null) agent.putHttpService(this, service, servlet); - //if (finest) logger.finest(threadName + " Create RestServlet(resource.name='" + name + "') = " + servlet); + if (agent != null) agent.putService(this, service, servlet); + //if (finest) logger.finest(localThreadName + " Create RestServlet(resource.name='" + name + "') = " + servlet); if (ss != null) { String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value(); for (int i = 0; i < mappings.length; i++) { @@ -336,7 +336,7 @@ public class NodeHttpServer extends NodeServer { WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); if (ws != null && !ws.repair()) prefix2 = ""; resourceFactory.inject(servlet, NodeHttpServer.this); - if (finest) logger.finest(threadName + " " + stype.getName() + " create a RestWebSocketServlet"); + if (finest) logger.finest(localThreadName + " " + stype.getName() + " create a RestWebSocketServlet"); if (ss != null) { String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value(); for (int i = 0; i < mappings.length; i++) { @@ -355,13 +355,13 @@ public class NodeHttpServer extends NodeServer { if (as.getKey().length() > max) max = as.getKey().length(); } for (AbstractMap.SimpleEntry as : ss) { - sb.append(threadName).append(" Load ").append(as.getKey()); + sb.append(localThreadName).append(" Load ").append(as.getKey()); for (int i = 0; i < max - as.getKey().length(); i++) { sb.append(' '); } sb.append(" mapping to ").append(Arrays.toString(as.getValue())).append(LINE_SEPARATOR); } - sb.append(threadName).append(" All HttpServlets load cost ").append(System.currentTimeMillis() - starts).append(" ms").append(LINE_SEPARATOR); + sb.append(localThreadName).append(" All HttpServlets load cost ").append(System.currentTimeMillis() - starts).append(" ms").append(LINE_SEPARATOR); } } } diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 4dafd3243..60d983dd9 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -12,8 +12,9 @@ import java.util.logging.Logger; import javax.annotation.Resource; import org.redkale.boot.*; import static org.redkale.boot.Application.RESNAME_APP_NODEID; +import org.redkale.net.Servlet; import org.redkale.net.http.*; -import org.redkale.net.sncp.Sncp; +import org.redkale.net.sncp.*; import org.redkale.service.Service; import org.redkale.util.*; @@ -45,14 +46,14 @@ public abstract class MessageAgent { protected SncpMessageProcessor sncpRespProcessor; //本地Service消息接收处理器, key:topic - protected ConcurrentHashMap httpNodes = new ConcurrentHashMap<>(); + protected ConcurrentHashMap messageNodes = new ConcurrentHashMap<>(); public void init(AnyValue config) { } public CompletableFuture start(final StringBuffer sb) { AtomicInteger maxlen = new AtomicInteger(sncpRespConsumer == null ? 0 : sncpRespConsumer.topic.length()); - this.httpNodes.values().forEach(node -> { + this.messageNodes.values().forEach(node -> { if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length()); }); if (this.sncpRespConsumer != null) { @@ -61,7 +62,7 @@ public abstract class MessageAgent { this.sncpRespConsumer.waitFor(); sb.append("MessageConsumer(topic=").append(fillString(this.sncpRespConsumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); } - this.httpNodes.values().forEach(node -> { + this.messageNodes.values().forEach(node -> { long s = System.currentTimeMillis(); node.consumer.start(); node.consumer.waitFor(); @@ -71,7 +72,7 @@ public abstract class MessageAgent { } public CompletableFuture stop() { - this.httpNodes.values().forEach(node -> { + this.messageNodes.values().forEach(node -> { node.consumer.close(); }); return CompletableFuture.completedFuture(null); @@ -134,11 +135,18 @@ public abstract class MessageAgent { this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor); } - public final synchronized void putHttpService(NodeHttpServer ns, Service service, HttpServlet servlet) { + public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { String topic = generateHttpReqTopic(service); - if (httpNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); + if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.producer, ns, service, servlet); - this.httpNodes.put(topic, new HttpMessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); + this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); + } + + public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { + String topic = generateSncpReqTopic(service); + if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); + SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this); + this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); } //格式: sncp.req.user @@ -187,19 +195,19 @@ public abstract class MessageAgent { return sb.toString(); } - protected static class HttpMessageNode { + protected static class MessageNode { - public final NodeHttpServer server; + public final NodeServer server; public final Service service; - public final HttpServlet servlet; + public final Servlet servlet; - public final HttpMessageProcessor processor; + public final MessageProcessor processor; public final MessageConsumer consumer; - public HttpMessageNode(NodeHttpServer server, Service service, HttpServlet servlet, HttpMessageProcessor processor, MessageConsumer consumer) { + public MessageNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageConsumer consumer) { this.server = server; this.service = service; this.servlet = servlet;