diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index 61a5309aa..c27a42b6a 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -47,7 +47,7 @@ public class HttpMessageProcessor implements MessageProcessor { HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer); servlet.execute(request, response); } catch (Exception ex) { - HttpMessageResponse.finishHttpResult(producer, message.getResptopic(), new HttpResult().status(500)); + HttpMessageResponse.finishHttpResult(message, producer, message.getResptopic(), new HttpResult().status(500)); logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); } } diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index ce3c94c7a..c7fc779e3 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -41,13 +41,13 @@ public class HttpMessageResponse extends HttpResponse { } public void finishHttpResult(HttpResult result) { - finishHttpResult(this.producer, message.getResptopic(), result); + finishHttpResult(this.message, this.producer, message.getResptopic(), result); } - public static void finishHttpResult(MessageProducer producer, String resptopic, HttpResult result) { + public static void finishHttpResult(MessageRecord msg, MessageProducer producer, String resptopic, HttpResult result) { ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType(); byte[] content = HttpResultCoder.getInstance().encode(result); - producer.apply(new MessageRecord(format, resptopic, null, content)); + producer.apply(new MessageRecord(msg.getSeqid(), format, resptopic, null, content)); } @Override diff --git a/src/org/redkale/mq/MessageRespFutureNode.java b/src/org/redkale/mq/MessageRespFutureNode.java new file mode 100644 index 000000000..622b1faf5 --- /dev/null +++ b/src/org/redkale/mq/MessageRespFutureNode.java @@ -0,0 +1,45 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.concurrent.CompletableFuture; + +/** + * MQ管理器 + * + * + * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class MessageRespFutureNode { + + protected final long seqid; + + protected final long createtime; + + protected final CompletableFuture future; + + public MessageRespFutureNode(long seqid, CompletableFuture future) { + this.seqid = seqid; + this.future = future; + this.createtime = System.currentTimeMillis(); + } + + public long getSeqid() { + return seqid; + } + + public long getCreatetime() { + return createtime; + } + + public CompletableFuture getFuture() { + return future; + } +} diff --git a/src/org/redkale/mq/SncpRespProcessor.java b/src/org/redkale/mq/SncpRespProcessor.java index 20334fd70..71f1cbe46 100644 --- a/src/org/redkale/mq/SncpRespProcessor.java +++ b/src/org/redkale/mq/SncpRespProcessor.java @@ -24,7 +24,7 @@ public class SncpRespProcessor implements MessageProcessor { protected final MessageAgent agent; - protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); public SncpRespProcessor(Logger logger, MessageAgent agent) { this.logger = logger; @@ -33,7 +33,7 @@ public class SncpRespProcessor implements MessageProcessor { @Override public void process(MessageRecord message) { - RespFutureNode node = respNodes.get(message.getSeqid()); + MessageRespFutureNode node = respNodes.get(message.getSeqid()); if (node == null) { logger.log(Level.WARNING, SncpRespProcessor.class.getSimpleName() + " process " + message + " error"); return; @@ -43,24 +43,9 @@ public class SncpRespProcessor implements MessageProcessor { public CompletableFuture createFuture(long seqid) { CompletableFuture future = new CompletableFuture<>(); - RespFutureNode node = new RespFutureNode(seqid, future); + MessageRespFutureNode node = new MessageRespFutureNode(seqid, future); respNodes.put(seqid, node); return future; } - protected static class RespFutureNode { - - public final long seqid; - - public final long createtime; - - public final CompletableFuture future; - - public RespFutureNode(long seqid, CompletableFuture future) { - this.seqid = seqid; - this.future = future; - this.createtime = System.currentTimeMillis(); - } - - } } diff --git a/src/org/redkale/net/http/HttpResponse.java b/src/org/redkale/net/http/HttpResponse.java index 10d1bac7f..207e4bedb 100644 --- a/src/org/redkale/net/http/HttpResponse.java +++ b/src/org/redkale/net/http/HttpResponse.java @@ -160,16 +160,16 @@ public class HttpResponse extends Response { public HttpResponse(HttpContext context, HttpRequest request, ObjectPool responsePool, HttpResponseConfig config) { super(context, request, responsePool); - this.plainContentType = config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType; - this.jsonContentType = config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType; + this.plainContentType = config == null || config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType; + this.jsonContentType = config == null || config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType; this.plainContentTypeBytes = ("Content-Type: " + this.plainContentType + "\r\n").getBytes(); this.jsonContentTypeBytes = ("Content-Type: " + this.jsonContentType + "\r\n").getBytes(); - this.defaultAddHeaders = config.defaultAddHeaders; - this.defaultSetHeaders = config.defaultSetHeaders; - this.defaultCookie = config.defaultCookie; - this.autoOptions = config.autoOptions; - this.dateSupplier = config.dateSupplier; - this.renders = config.renders; + this.defaultAddHeaders = config == null ? null : config.defaultAddHeaders; + this.defaultSetHeaders = config == null ? null : config.defaultSetHeaders; + this.defaultCookie = config == null ? null : config.defaultCookie; + this.autoOptions = config == null ? false : config.autoOptions; + this.dateSupplier = config == null ? null : config.dateSupplier; + this.renders = config == null ? null : config.renders; this.hasRender = renders != null && !renders.isEmpty(); this.onlyoneHttpRender = renders != null && renders.size() == 1 ? renders.get(0) : null; this.contentType = this.plainContentType;