This commit is contained in:
Redkale
2020-06-03 13:55:08 +08:00
parent 1edadbfee8
commit 14808cb01c
4 changed files with 54 additions and 14 deletions

View File

@@ -276,6 +276,7 @@ public class NodeHttpServer extends NodeServer {
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
if (ws != null && !ws.repair()) prefix2 = ""; if (ws != null && !ws.repair()) prefix2 = "";
resourceFactory.inject(servlet, NodeHttpServer.this); resourceFactory.inject(servlet, NodeHttpServer.this);
if (agent != null) agent.putHttpService(this, service, servlet);
//if (finest) logger.finest(threadName + " Create RestServlet(resource.name='" + name + "') = " + servlet); //if (finest) logger.finest(threadName + " Create RestServlet(resource.name='" + name + "') = " + servlet);
if (ss != null) { if (ss != null) {
String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value(); String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value();

View File

@@ -21,19 +21,20 @@ import org.redkale.service.Service;
*/ */
public class HttpMessageProcessor implements MessageProcessor { public class HttpMessageProcessor implements MessageProcessor {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger;
protected final MessageProducer producer; protected final MessageProducer producer;
protected final NodeHttpServer ns; protected final NodeHttpServer server;
protected final Service service; protected final Service service;
protected final HttpServlet servlet; protected final HttpServlet servlet;
public HttpMessageProcessor(MessageProducer producer, NodeHttpServer ns, Service service, HttpServlet servlet) { public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger;
this.producer = producer; this.producer = producer;
this.ns = ns; this.server = server;
this.service = service; this.service = service;
this.servlet = servlet; this.servlet = servlet;
} }
@@ -41,7 +42,7 @@ public class HttpMessageProcessor implements MessageProcessor {
@Override @Override
public void process(MessageRecord message) { public void process(MessageRecord message) {
try { try {
HttpContext context = ns.getHttpServer().getContext(); HttpContext context = server.getHttpServer().getContext();
HttpMessageRequest request = new HttpMessageRequest(context, message); HttpMessageRequest request = new HttpMessageRequest(context, message);
HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer); HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer);
servlet.execute(request, response); servlet.execute(request, response);
@@ -51,4 +52,20 @@ public class HttpMessageProcessor implements MessageProcessor {
} }
} }
public MessageProducer getProducer() {
return producer;
}
public NodeHttpServer getServer() {
return server;
}
public Service getService() {
return service;
}
public HttpServlet getServlet() {
return servlet;
}
} }

View File

@@ -7,7 +7,6 @@ package org.redkale.mq;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.redkale.boot.*; import org.redkale.boot.*;
@@ -43,7 +42,7 @@ public abstract class MessageAgent {
protected MessageConsumer sncpRespConsumer; protected MessageConsumer sncpRespConsumer;
//本地Service消息接收处理器 key:topic //本地Service消息接收处理器 key:topic
protected ConcurrentHashMap<String, Service> localConsumers; protected ConcurrentHashMap<String, HttpMessageNode> httpNodes = new ConcurrentHashMap<>();
public void init(AnyValue config) { public void init(AnyValue config) {
} }
@@ -105,10 +104,13 @@ public abstract class MessageAgent {
public abstract List<String> queryTopic(); public abstract List<String> queryTopic();
//创建指定topic的消费处理器 //创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, Consumer<MessageRecord> processor); public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor);
public final void putHttpService(NodeHttpServer ns, Service service) {
public final synchronized void putHttpService(NodeHttpServer ns, Service service, HttpServlet servlet) {
String topic = generateHttpReqTopic(service);
if (httpNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.producer, ns, service, servlet);
this.httpNodes.put(topic, new HttpMessageNode(ns, service, servlet, processor, createConsumer(topic, processor)));
} }
//格式: sncp:req:user //格式: sncp:req:user
@@ -148,4 +150,25 @@ public abstract class MessageAgent {
return protocol + ":resp:node" + nodeid; return protocol + ":resp:node" + nodeid;
} }
protected static class HttpMessageNode {
public final NodeHttpServer server;
public final Service service;
public final HttpServlet servlet;
public final HttpMessageProcessor processor;
public final MessageConsumer consumer;
public HttpMessageNode(NodeHttpServer server, Service service, HttpServlet servlet, HttpMessageProcessor processor, MessageConsumer consumer) {
this.server = server;
this.service = service;
this.servlet = servlet;
this.processor = processor;
this.consumer = consumer;
}
}
} }

View File

@@ -6,7 +6,6 @@
package org.redkale.mq; package org.redkale.mq;
import java.util.Objects; import java.util.Objects;
import java.util.function.Consumer;
import java.util.logging.Logger; import java.util.logging.Logger;
/** /**
@@ -23,20 +22,20 @@ public abstract class MessageConsumer extends Thread {
protected final String topic; protected final String topic;
protected final Consumer<MessageRecord> processor; protected final MessageProcessor processor;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected volatile boolean closed; protected volatile boolean closed;
protected MessageConsumer(String topic, Consumer<MessageRecord> processor) { protected MessageConsumer(String topic, MessageProcessor processor) {
Objects.requireNonNull(topic); Objects.requireNonNull(topic);
Objects.requireNonNull(processor); Objects.requireNonNull(processor);
this.topic = topic; this.topic = topic;
this.processor = processor; this.processor = processor;
} }
public Consumer<MessageRecord> getProcessor() { public MessageProcessor getProcessor() {
return processor; return processor;
} }