From 6909748bda6d517dcfd83b654411e561f30083a2 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 3 Jun 2020 19:26:38 +0800 Subject: [PATCH] --- src/org/redkale/boot/NodeSncpServer.java | 3 +- src/org/redkale/mq/HttpMessageRequest.java | 10 ++--- src/org/redkale/mq/HttpMessageResponse.java | 28 ++++++------ src/org/redkale/mq/MessageAgent.java | 8 ++-- src/org/redkale/mq/SncpMessageProcessor.java | 47 +++++++++++++++++--- src/org/redkale/mq/SncpMessageRequest.java | 11 ++--- src/org/redkale/mq/SncpMessageResponse.java | 18 ++++---- src/org/redkale/mq/SncpRespProcessor.java | 34 ++++++++++++++ 8 files changed, 117 insertions(+), 42 deletions(-) create mode 100644 src/org/redkale/mq/SncpRespProcessor.java diff --git a/src/org/redkale/boot/NodeSncpServer.java b/src/org/redkale/boot/NodeSncpServer.java index a60809623..9ab3c96ac 100644 --- a/src/org/redkale/boot/NodeSncpServer.java +++ b/src/org/redkale/boot/NodeSncpServer.java @@ -13,7 +13,7 @@ import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.mq.MessageAgent; import org.redkale.net.*; import org.redkale.net.sncp.*; -import org.redkale.service.Service; +import org.redkale.service.*; import org.redkale.util.*; import org.redkale.util.AnyValue.DefaultAnyValue; @@ -34,6 +34,7 @@ public class NodeSncpServer extends NodeServer { super(application, createServer(application, serconf)); this.sncpServer = (SncpServer) this.server; this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> { + if (x.getClass().getAnnotation(Local.class) != null) return; SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet if (agent != null) agent.putService(this, x, servlet); }; diff --git a/src/org/redkale/mq/HttpMessageRequest.java b/src/org/redkale/mq/HttpMessageRequest.java index 8c9142c56..b015fa981 100644 --- a/src/org/redkale/mq/HttpMessageRequest.java +++ b/src/org/redkale/mq/HttpMessageRequest.java @@ -18,12 +18,12 @@ import org.redkale.net.http.*; */ public class HttpMessageRequest extends HttpRequest { - protected MessageRecord reqMessage; + protected MessageRecord message; - public HttpMessageRequest(HttpContext context, MessageRecord reqMessage) { - super(context, reqMessage.decodeContent(HttpSimpleRequestCoder.getInstance())); - this.reqMessage = reqMessage; - this.currentUserid = reqMessage.getUserid(); + public HttpMessageRequest(HttpContext context, MessageRecord message) { + super(context, message.decodeContent(HttpSimpleRequestCoder.getInstance())); + this.message = message; + this.currentUserid = message.getUserid(); } } diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index dffa2e37b..ce3c94c7a 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -23,25 +23,25 @@ import org.redkale.util.ObjectPool; */ public class HttpMessageResponse extends HttpResponse { - protected MessageRecord reqMessage; + protected MessageRecord message; protected MessageProducer producer; public HttpMessageResponse(HttpContext context, HttpMessageRequest request, ObjectPool responsePool, HttpResponseConfig config, MessageProducer producer) { super(context, request, responsePool, config); - this.reqMessage = request.reqMessage; + this.message = request.message; this.producer = producer; } - public HttpMessageResponse(HttpContext context, MessageRecord reqMessage, HttpResponseConfig config, MessageProducer producer) { - super(context, new HttpMessageRequest(context, reqMessage), null, config); - this.reqMessage = reqMessage; + public HttpMessageResponse(HttpContext context, MessageRecord message, HttpResponseConfig config, MessageProducer producer) { + super(context, new HttpMessageRequest(context, message), null, config); + this.message = message; this.producer = producer; } public void finishHttpResult(HttpResult result) { - finishHttpResult(this.producer, reqMessage.getResptopic(), result); + finishHttpResult(this.producer, message.getResptopic(), result); } public static void finishHttpResult(MessageProducer producer, String resptopic, HttpResult result) { @@ -52,44 +52,44 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finishJson(org.redkale.service.RetResult ret) { - if (reqMessage.isEmptyResptopic()) return; + if (message.isEmptyResptopic()) return; finishHttpResult(new HttpResult(ret.clearConvert(), ret)); } @Override public void finish(String obj) { - if (reqMessage.isEmptyResptopic()) return; + if (message.isEmptyResptopic()) return; finishHttpResult(new HttpResult(obj == null ? "" : obj)); } @Override public void finish(int status, String message) { - if (reqMessage.isEmptyResptopic()) return; + if (this.message.isEmptyResptopic()) return; finishHttpResult(new HttpResult(message == null ? "" : message).status(status)); } @Override public void finish(final Convert convert, HttpResult result) { - if (reqMessage.isEmptyResptopic()) return; + if (message.isEmptyResptopic()) return; if (convert != null) result.convert(convert); finishHttpResult(result); } @Override public void finish(final byte[] bs) { - if (reqMessage.isEmptyResptopic()) return; + if (message.isEmptyResptopic()) return; finishHttpResult(new HttpResult(bs)); } @Override public void finish(final String contentType, final byte[] bs) { - if (reqMessage.isEmptyResptopic()) return; + if (message.isEmptyResptopic()) return; finishHttpResult(new HttpResult(bs).contentType(contentType)); } @Override public void finish(boolean kill, ByteBuffer buffer) { - if (reqMessage.isEmptyResptopic()) return; + if (message.isEmptyResptopic()) return; byte[] bs = new byte[buffer.remaining()]; buffer.get(bs); finishHttpResult(new HttpResult(bs)); @@ -97,7 +97,7 @@ public class HttpMessageResponse extends HttpResponse { @Override public void finish(boolean kill, ByteBuffer... buffers) { - if (reqMessage.isEmptyResptopic()) return; + if (message.isEmptyResptopic()) return; int size = 0; for (ByteBuffer buf : buffers) { size += buf.remaining(); diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 544b1c04e..99617510b 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -43,7 +43,7 @@ public abstract class MessageAgent { protected MessageConsumer sncpRespConsumer; - protected SncpMessageProcessor sncpRespProcessor; + protected SncpRespProcessor sncpRespProcessor; //sncpRespConsumer启动耗时, 小于0表示未启动 protected long sncpRespStartms = -1; @@ -141,21 +141,21 @@ public abstract class MessageAgent { public final synchronized void putSncpResp(NodeSncpServer ns) { if (this.sncpRespConsumer != null) return; - this.sncpRespProcessor = new SncpMessageProcessor(logger, this); + this.sncpRespProcessor = new SncpRespProcessor(this.logger, this); this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor); } public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { String topic = generateHttpReqTopic(service); if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); - HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.producer, ns, service, servlet); + HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet); this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); } public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { String topic = generateSncpReqTopic(service); if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); - SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this); + SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet); this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor))); } diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java index e73084a78..74c9afc07 100644 --- a/src/org/redkale/mq/SncpMessageProcessor.java +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -5,7 +5,10 @@ */ package org.redkale.mq; -import java.util.logging.Logger; +import java.util.logging.*; +import org.redkale.boot.NodeSncpServer; +import org.redkale.net.sncp.*; +import org.redkale.service.Service; /** * @@ -20,15 +23,49 @@ public class SncpMessageProcessor implements MessageProcessor { protected final Logger logger; - protected final MessageAgent agent; + protected final MessageProducer producer; - public SncpMessageProcessor(Logger logger, MessageAgent agent) { + protected final NodeSncpServer server; + + protected final Service service; + + protected final SncpServlet servlet; + + public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; - this.agent = agent; + this.producer = producer; + this.server = server; + this.service = service; + this.servlet = servlet; } @Override public void process(MessageRecord message) { - + SncpContext context = server.getSncpServer().getContext(); + SncpMessageRequest request = new SncpMessageRequest(context, message); + SncpMessageResponse response = new SncpMessageResponse(context, request, null, producer); + try { + servlet.execute(request, response); + } catch (Exception ex) { + response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); + logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); + } } + + public MessageProducer getProducer() { + return producer; + } + + public NodeSncpServer getServer() { + return server; + } + + public Service getService() { + return service; + } + + public SncpServlet getServlet() { + return servlet; + } + } diff --git a/src/org/redkale/mq/SncpMessageRequest.java b/src/org/redkale/mq/SncpMessageRequest.java index b86265a3a..81c173958 100644 --- a/src/org/redkale/mq/SncpMessageRequest.java +++ b/src/org/redkale/mq/SncpMessageRequest.java @@ -19,12 +19,13 @@ import org.redkale.net.sncp.*; */ public class SncpMessageRequest extends SncpRequest { - public SncpMessageRequest(SncpContext context) { + protected MessageRecord message; + + @SuppressWarnings("OverridableMethodCallInConstructor") + public SncpMessageRequest(SncpContext context, MessageRecord message) { super(context, null); + this.message = message; + readHeader(ByteBuffer.wrap(message.getContent())); } - @Override - public int readHeader(ByteBuffer buffer) { - return super.readHeader(buffer); - } } diff --git a/src/org/redkale/mq/SncpMessageResponse.java b/src/org/redkale/mq/SncpMessageResponse.java index 8c568d7e6..759525dad 100644 --- a/src/org/redkale/mq/SncpMessageResponse.java +++ b/src/org/redkale/mq/SncpMessageResponse.java @@ -6,7 +6,7 @@ package org.redkale.mq; import java.nio.ByteBuffer; -import java.util.function.*; +import org.redkale.convert.ConvertType; import org.redkale.convert.bson.BsonWriter; import org.redkale.net.Response; import org.redkale.net.sncp.*; @@ -26,16 +26,18 @@ public class SncpMessageResponse extends SncpResponse { protected MessageRecord message; - protected BiConsumer resultConsumer; + protected MessageProducer producer; - public SncpMessageResponse(SncpContext context, SncpMessageRequest request, ObjectPool responsePool) { + public SncpMessageResponse(SncpContext context, SncpMessageRequest request, ObjectPool responsePool, MessageProducer producer) { super(context, request, responsePool); + this.message = request.message; + this.producer = producer; } - public SncpMessageResponse resultConsumer(MessageRecord message, BiConsumer resultConsumer) { + public SncpMessageResponse(SncpContext context, MessageRecord message, ObjectPool responsePool, MessageProducer producer) { + super(context, new SncpMessageRequest(context, message), responsePool); this.message = message; - this.resultConsumer = resultConsumer; - return this; + this.producer = producer; } @Override @@ -43,12 +45,12 @@ public class SncpMessageResponse extends SncpResponse { if (out == null) { final byte[] result = new byte[SncpRequest.HEADER_SIZE]; fillHeader(ByteBuffer.wrap(result), 0, retcode); - resultConsumer.accept(message, result); + producer.apply(new MessageRecord(ConvertType.BSON, message.getResptopic(), null, (byte[]) null)); return; } final int respBodyLength = out.count(); //body总长度 final byte[] result = out.toArray(); fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode); - resultConsumer.accept(message, result); + producer.apply(new MessageRecord(ConvertType.BSON, message.getResptopic(), null, result)); } } diff --git a/src/org/redkale/mq/SncpRespProcessor.java b/src/org/redkale/mq/SncpRespProcessor.java new file mode 100644 index 000000000..0d3cfe277 --- /dev/null +++ b/src/org/redkale/mq/SncpRespProcessor.java @@ -0,0 +1,34 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.logging.Logger; + +/** + * MQ管理器 + * + * + * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class SncpRespProcessor implements MessageProcessor { + + protected final Logger logger; + + protected final MessageAgent agent; + + public SncpRespProcessor(Logger logger, MessageAgent agent) { + this.logger = logger; + this.agent = agent; + } + + @Override + public void process(MessageRecord message) { + } +}