This commit is contained in:
Redkale
2020-06-03 23:21:57 +08:00
parent 218aebaa0f
commit f79e49db5a
5 changed files with 60 additions and 30 deletions

View File

@@ -47,7 +47,7 @@ public class HttpMessageProcessor implements MessageProcessor {
HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer);
servlet.execute(request, response);
} catch (Exception ex) {
HttpMessageResponse.finishHttpResult(producer, message.getResptopic(), new HttpResult().status(500));
HttpMessageResponse.finishHttpResult(message, producer, message.getResptopic(), new HttpResult().status(500));
logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
}
}

View File

@@ -41,13 +41,13 @@ public class HttpMessageResponse extends HttpResponse {
}
public void finishHttpResult(HttpResult result) {
finishHttpResult(this.producer, message.getResptopic(), result);
finishHttpResult(this.message, this.producer, message.getResptopic(), result);
}
public static void finishHttpResult(MessageProducer producer, String resptopic, HttpResult result) {
public static void finishHttpResult(MessageRecord msg, MessageProducer producer, String resptopic, HttpResult result) {
ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType();
byte[] content = HttpResultCoder.getInstance().encode(result);
producer.apply(new MessageRecord(format, resptopic, null, content));
producer.apply(new MessageRecord(msg.getSeqid(), format, resptopic, null, content));
}
@Override

View File

@@ -0,0 +1,45 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CompletableFuture;
/**
* MQ管理器
*
*
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class MessageRespFutureNode {
protected final long seqid;
protected final long createtime;
protected final CompletableFuture<MessageRecord> future;
public MessageRespFutureNode(long seqid, CompletableFuture<MessageRecord> future) {
this.seqid = seqid;
this.future = future;
this.createtime = System.currentTimeMillis();
}
public long getSeqid() {
return seqid;
}
public long getCreatetime() {
return createtime;
}
public CompletableFuture<MessageRecord> getFuture() {
return future;
}
}

View File

@@ -24,7 +24,7 @@ public class SncpRespProcessor implements MessageProcessor {
protected final MessageAgent agent;
protected final ConcurrentHashMap<Long, RespFutureNode> respNodes = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>();
public SncpRespProcessor(Logger logger, MessageAgent agent) {
this.logger = logger;
@@ -33,7 +33,7 @@ public class SncpRespProcessor implements MessageProcessor {
@Override
public void process(MessageRecord message) {
RespFutureNode node = respNodes.get(message.getSeqid());
MessageRespFutureNode node = respNodes.get(message.getSeqid());
if (node == null) {
logger.log(Level.WARNING, SncpRespProcessor.class.getSimpleName() + " process " + message + " error");
return;
@@ -43,24 +43,9 @@ public class SncpRespProcessor implements MessageProcessor {
public CompletableFuture<MessageRecord> createFuture(long seqid) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
RespFutureNode node = new RespFutureNode(seqid, future);
MessageRespFutureNode node = new MessageRespFutureNode(seqid, future);
respNodes.put(seqid, node);
return future;
}
protected static class RespFutureNode {
public final long seqid;
public final long createtime;
public final CompletableFuture<MessageRecord> future;
public RespFutureNode(long seqid, CompletableFuture<MessageRecord> future) {
this.seqid = seqid;
this.future = future;
this.createtime = System.currentTimeMillis();
}
}
}

View File

@@ -160,16 +160,16 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public HttpResponse(HttpContext context, HttpRequest request, ObjectPool<Response> responsePool, HttpResponseConfig config) {
super(context, request, responsePool);
this.plainContentType = config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType;
this.jsonContentType = config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType;
this.plainContentType = config == null || config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType;
this.jsonContentType = config == null || config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType;
this.plainContentTypeBytes = ("Content-Type: " + this.plainContentType + "\r\n").getBytes();
this.jsonContentTypeBytes = ("Content-Type: " + this.jsonContentType + "\r\n").getBytes();
this.defaultAddHeaders = config.defaultAddHeaders;
this.defaultSetHeaders = config.defaultSetHeaders;
this.defaultCookie = config.defaultCookie;
this.autoOptions = config.autoOptions;
this.dateSupplier = config.dateSupplier;
this.renders = config.renders;
this.defaultAddHeaders = config == null ? null : config.defaultAddHeaders;
this.defaultSetHeaders = config == null ? null : config.defaultSetHeaders;
this.defaultCookie = config == null ? null : config.defaultCookie;
this.autoOptions = config == null ? false : config.autoOptions;
this.dateSupplier = config == null ? null : config.dateSupplier;
this.renders = config == null ? null : config.renders;
this.hasRender = renders != null && !renders.isEmpty();
this.onlyoneHttpRender = renders != null && renders.size() == 1 ? renders.get(0) : null;
this.contentType = this.plainContentType;