diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java new file mode 100644 index 000000000..acb1230f3 --- /dev/null +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -0,0 +1,54 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.logging.*; +import org.redkale.boot.NodeHttpServer; +import org.redkale.net.http.*; +import org.redkale.service.Service; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class HttpMessageProcessor implements MessageProcessor { + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected final MessageProducer producer; + + protected final NodeHttpServer ns; + + protected final Service service; + + protected final HttpServlet servlet; + + public HttpMessageProcessor(MessageProducer producer, NodeHttpServer ns, Service service, HttpServlet servlet) { + this.producer = producer; + this.ns = ns; + this.service = service; + this.servlet = servlet; + } + + @Override + public void process(MessageRecord message) { + try { + HttpContext context = ns.getHttpServer().getContext(); + HttpMessageRequest request = new HttpMessageRequest(context, message); + HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer); + servlet.execute(request, response); + } catch (Exception ex) { + HttpMessageResponse.finishHttpResult(producer, message.getResptopic(), new HttpResult().status(500)); + logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); + } + } + +} diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index 549bdbf02..dffa2e37b 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -41,9 +41,13 @@ public class HttpMessageResponse extends HttpResponse { } public void finishHttpResult(HttpResult result) { + finishHttpResult(this.producer, reqMessage.getResptopic(), result); + } + + public static void finishHttpResult(MessageProducer producer, String resptopic, HttpResult result) { ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType(); byte[] content = HttpResultCoder.getInstance().encode(result); - this.producer.apply(new MessageRecord(format, reqMessage.getResptopic(), null, content)); + producer.apply(new MessageRecord(format, resptopic, null, content)); } @Override