This commit is contained in:
Redkale
2020-06-12 17:20:36 +08:00
parent 288fc67621
commit 3d9447327a
8 changed files with 67 additions and 79 deletions

View File

@@ -50,7 +50,7 @@ public class HttpMessageProcessor implements MessageProcessor {
}
@Override
public void process(MessageRecord message) {
public void process(MessageRecord message, Runnable callback) {
try {
if (multiconsumer) message.setResptopic(null); //不容许有响应
HttpContext context = server.getHttpServer().getContext();
@@ -58,11 +58,11 @@ public class HttpMessageProcessor implements MessageProcessor {
if (multiconsumer) {
request.setRequestURI(request.getRequestURI().replaceFirst(this.restmodule, this.multimodule));
}
HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer);
HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer);
servlet.execute(request, response);
} catch (Exception ex) {
if (message.getResptopic() != null && !message.getResptopic().isEmpty()) {
HttpMessageResponse.finishHttpResult(message, producer, message.getResptopic(), new HttpResult().status(500));
HttpMessageResponse.finishHttpResult(message, callback, producer, message.getResptopic(), new HttpResult().status(500));
}
logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
}

View File

@@ -27,24 +27,29 @@ public class HttpMessageResponse extends HttpResponse {
protected MessageProducer producer;
public HttpMessageResponse(HttpContext context, HttpMessageRequest request,
protected Runnable callback;
public HttpMessageResponse(HttpContext context, HttpMessageRequest request, Runnable callback,
ObjectPool<Response> responsePool, HttpResponseConfig config, MessageProducer producer) {
super(context, request, responsePool, config);
this.message = request.message;
this.callback = callback;
this.producer = producer;
}
public HttpMessageResponse(HttpContext context, MessageRecord message, HttpResponseConfig config, MessageProducer producer) {
public HttpMessageResponse(HttpContext context, MessageRecord message, Runnable callback, HttpResponseConfig config, MessageProducer producer) {
super(context, new HttpMessageRequest(context, message), null, config);
this.message = message;
this.callback = callback;
this.producer = producer;
}
public void finishHttpResult(HttpResult result) {
finishHttpResult(this.message, this.producer, message.getResptopic(), result);
finishHttpResult(this.message, this.callback, this.producer, message.getResptopic(), result);
}
public static void finishHttpResult(MessageRecord msg, MessageProducer producer, String resptopic, HttpResult result) {
public static void finishHttpResult(MessageRecord msg, Runnable callback, MessageProducer producer, String resptopic, HttpResult result) {
if(callback!=null) callback.run();
if (resptopic == null || resptopic.isEmpty()) return;
ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType();
byte[] content = HttpResultCoder.getInstance().encode(result);

View File

@@ -1,66 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.logging.*;
/**
* MQ管理器
*
*
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class HttpRespProcessor implements MessageProcessor {
protected final Logger logger;
protected final MessageAgent messageAgent;
protected final ConcurrentHashMap<Long, RespFutureNode> respNodes = new ConcurrentHashMap<>();
public HttpRespProcessor(Logger logger, MessageAgent messageAgent) {
this.logger = logger;
this.messageAgent = messageAgent;
}
@Override
public void process(MessageRecord message) {
RespFutureNode node = respNodes.get(message.getSeqid());
if (node == null) {
logger.log(Level.WARNING, HttpRespProcessor.class.getSimpleName() + " process " + message + " error");
return;
}
node.future.complete(message);
}
public CompletableFuture<MessageRecord> createFuture(long seqid) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
RespFutureNode node = new RespFutureNode(seqid, future);
respNodes.put(seqid, node);
return future;
}
protected static class RespFutureNode {
public final long seqid;
public final long createtime;
public final CompletableFuture<MessageRecord> future;
public RespFutureNode(long seqid, CompletableFuture<MessageRecord> future) {
this.seqid = seqid;
this.future = future;
this.createtime = System.currentTimeMillis();
}
}
}

View File

@@ -65,7 +65,7 @@ public abstract class MessageClient {
synchronized (this) {
if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic;
if (this.consumer == null) {
MessageProcessor processor = msg -> {
MessageProcessor processor = (msg,callback) -> {
MessageRespFutureNode node = respNodes.get(msg.getSeqid());
if (node == null) {
messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error");

View File

@@ -0,0 +1,38 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CountDownLatch;
/**
*
* @author zhangjx
*/
public class MessageMultiThreadProcessor implements MessageProcessor {
protected final MessageAgent messageAgent;
protected final MessageProcessor processor;
protected CountDownLatch cdl;
public MessageMultiThreadProcessor(MessageAgent messageAgent, MessageProcessor processor) {
this.messageAgent = messageAgent;
this.processor = processor;
}
@Override
public void begin(int size) {
}
@Override
public void process(MessageRecord message, Runnable callback) {
}
@Override
public void commit() {
}
}

View File

@@ -16,5 +16,11 @@ package org.redkale.mq;
*/
public interface MessageProcessor {
public void process(MessageRecord message);
default void begin(int size) {
}
public void process(MessageRecord message, Runnable callback);
default void commit() {
}
}

View File

@@ -40,10 +40,10 @@ public class SncpMessageProcessor implements MessageProcessor {
}
@Override
public void process(MessageRecord message) {
public void process(MessageRecord message, Runnable callback) {
SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message);
SncpMessageResponse response = new SncpMessageResponse(context, request, null, producer);
SncpMessageResponse response = new SncpMessageResponse(context, request, callback, null, producer);
try {
servlet.execute(request, response);
} catch (Exception ex) {

View File

@@ -28,20 +28,25 @@ public class SncpMessageResponse extends SncpResponse {
protected MessageProducer producer;
public SncpMessageResponse(SncpContext context, SncpMessageRequest request, ObjectPool<Response> responsePool, MessageProducer producer) {
protected Runnable callback;
public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, ObjectPool<Response> responsePool, MessageProducer producer) {
super(context, request, responsePool);
this.message = request.message;
this.callback = callback;
this.producer = producer;
}
public SncpMessageResponse(SncpContext context, MessageRecord message, ObjectPool<Response> responsePool, MessageProducer producer) {
public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, ObjectPool<Response> responsePool, MessageProducer producer) {
super(context, new SncpMessageRequest(context, message), responsePool);
this.message = message;
this.callback = callback;
this.producer = producer;
}
@Override
public void finish(final int retcode, final BsonWriter out) {
if (callback != null) callback.run();
if (out == null) {
final byte[] result = new byte[SncpRequest.HEADER_SIZE];
fillHeader(ByteBuffer.wrap(result), 0, retcode);