From 14808cb01ce423fa9647429f0ee5ba58a3f3a731 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 3 Jun 2020 13:55:08 +0800 Subject: [PATCH] --- src/org/redkale/boot/NodeHttpServer.java | 1 + src/org/redkale/mq/HttpMessageProcessor.java | 27 +++++++++++++--- src/org/redkale/mq/MessageAgent.java | 33 +++++++++++++++++--- src/org/redkale/mq/MessageConsumer.java | 7 ++--- 4 files changed, 54 insertions(+), 14 deletions(-) diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index f529782f7..91740f735 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -276,6 +276,7 @@ public class NodeHttpServer extends NodeServer { WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); if (ws != null && !ws.repair()) prefix2 = ""; resourceFactory.inject(servlet, NodeHttpServer.this); + if (agent != null) agent.putHttpService(this, service, servlet); //if (finest) logger.finest(threadName + " Create RestServlet(resource.name='" + name + "') = " + servlet); if (ss != null) { String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value(); diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index acb1230f3..61a5309aa 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -21,19 +21,20 @@ import org.redkale.service.Service; */ public class HttpMessageProcessor implements MessageProcessor { - protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + protected final Logger logger; protected final MessageProducer producer; - protected final NodeHttpServer ns; + protected final NodeHttpServer server; protected final Service service; protected final HttpServlet servlet; - public HttpMessageProcessor(MessageProducer producer, NodeHttpServer ns, Service service, HttpServlet servlet) { + public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) { + this.logger = logger; this.producer = producer; - this.ns = ns; + this.server = server; this.service = service; this.servlet = servlet; } @@ -41,7 +42,7 @@ public class HttpMessageProcessor implements MessageProcessor { @Override public void process(MessageRecord message) { try { - HttpContext context = ns.getHttpServer().getContext(); + HttpContext context = server.getHttpServer().getContext(); HttpMessageRequest request = new HttpMessageRequest(context, message); HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer); servlet.execute(request, response); @@ -51,4 +52,20 @@ public class HttpMessageProcessor implements MessageProcessor { } } + public MessageProducer getProducer() { + return producer; + } + + public NodeHttpServer getServer() { + return server; + } + + public Service getService() { + return service; + } + + public HttpServlet getServlet() { + return servlet; + } + } diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 4f14fd97c..567528fe1 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -7,7 +7,6 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; -import java.util.function.*; import java.util.logging.Logger; import javax.annotation.Resource; import org.redkale.boot.*; @@ -43,7 +42,7 @@ public abstract class MessageAgent { protected MessageConsumer sncpRespConsumer; //本地Service消息接收处理器, key:topic - protected ConcurrentHashMap localConsumers; + protected ConcurrentHashMap httpNodes = new ConcurrentHashMap<>(); public void init(AnyValue config) { } @@ -105,10 +104,13 @@ public abstract class MessageAgent { public abstract List queryTopic(); //创建指定topic的消费处理器 - public abstract MessageConsumer createConsumer(String topic, Consumer processor); - - public final void putHttpService(NodeHttpServer ns, Service service) { + public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor); + public final synchronized void putHttpService(NodeHttpServer ns, Service service, HttpServlet servlet) { + String topic = generateHttpReqTopic(service); + if (httpNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); + HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.producer, ns, service, servlet); + this.httpNodes.put(topic, new HttpMessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); } //格式: sncp:req:user @@ -148,4 +150,25 @@ public abstract class MessageAgent { return protocol + ":resp:node" + nodeid; } + protected static class HttpMessageNode { + + public final NodeHttpServer server; + + public final Service service; + + public final HttpServlet servlet; + + public final HttpMessageProcessor processor; + + public final MessageConsumer consumer; + + public HttpMessageNode(NodeHttpServer server, Service service, HttpServlet servlet, HttpMessageProcessor processor, MessageConsumer consumer) { + this.server = server; + this.service = service; + this.servlet = servlet; + this.processor = processor; + this.consumer = consumer; + } + + } } diff --git a/src/org/redkale/mq/MessageConsumer.java b/src/org/redkale/mq/MessageConsumer.java index b98ef2936..51637f873 100644 --- a/src/org/redkale/mq/MessageConsumer.java +++ b/src/org/redkale/mq/MessageConsumer.java @@ -6,7 +6,6 @@ package org.redkale.mq; import java.util.Objects; -import java.util.function.Consumer; import java.util.logging.Logger; /** @@ -23,20 +22,20 @@ public abstract class MessageConsumer extends Thread { protected final String topic; - protected final Consumer processor; + protected final MessageProcessor processor; protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected volatile boolean closed; - protected MessageConsumer(String topic, Consumer processor) { + protected MessageConsumer(String topic, MessageProcessor processor) { Objects.requireNonNull(topic); Objects.requireNonNull(processor); this.topic = topic; this.processor = processor; } - public Consumer getProcessor() { + public MessageProcessor getProcessor() { return processor; }