From 3d9447327a7949e4758a14f9756d7be1e27f3f97 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 12 Jun 2020 17:20:36 +0800 Subject: [PATCH] --- src/org/redkale/mq/HttpMessageProcessor.java | 6 +- src/org/redkale/mq/HttpMessageResponse.java | 13 ++-- src/org/redkale/mq/HttpRespProcessor.java | 66 ------------------- src/org/redkale/mq/MessageClient.java | 2 +- .../mq/MessageMultiThreadProcessor.java | 38 +++++++++++ src/org/redkale/mq/MessageProcessor.java | 8 ++- src/org/redkale/mq/SncpMessageProcessor.java | 4 +- src/org/redkale/mq/SncpMessageResponse.java | 9 ++- 8 files changed, 67 insertions(+), 79 deletions(-) delete mode 100644 src/org/redkale/mq/HttpRespProcessor.java create mode 100644 src/org/redkale/mq/MessageMultiThreadProcessor.java diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index bd5cac030..2e543e044 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -50,7 +50,7 @@ public class HttpMessageProcessor implements MessageProcessor { } @Override - public void process(MessageRecord message) { + public void process(MessageRecord message, Runnable callback) { try { if (multiconsumer) message.setResptopic(null); //不容许有响应 HttpContext context = server.getHttpServer().getContext(); @@ -58,11 +58,11 @@ public class HttpMessageProcessor implements MessageProcessor { if (multiconsumer) { request.setRequestURI(request.getRequestURI().replaceFirst(this.restmodule, this.multimodule)); } - HttpMessageResponse response = new HttpMessageResponse(context, request, null, null, producer); + HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer); servlet.execute(request, response); } catch (Exception ex) { if (message.getResptopic() != null && !message.getResptopic().isEmpty()) { - HttpMessageResponse.finishHttpResult(message, producer, message.getResptopic(), new HttpResult().status(500)); + HttpMessageResponse.finishHttpResult(message, callback, producer, message.getResptopic(), new HttpResult().status(500)); } logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); } diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index 7896b4818..f2523a594 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -27,24 +27,29 @@ public class HttpMessageResponse extends HttpResponse { protected MessageProducer producer; - public HttpMessageResponse(HttpContext context, HttpMessageRequest request, + protected Runnable callback; + + public HttpMessageResponse(HttpContext context, HttpMessageRequest request, Runnable callback, ObjectPool responsePool, HttpResponseConfig config, MessageProducer producer) { super(context, request, responsePool, config); this.message = request.message; + this.callback = callback; this.producer = producer; } - public HttpMessageResponse(HttpContext context, MessageRecord message, HttpResponseConfig config, MessageProducer producer) { + public HttpMessageResponse(HttpContext context, MessageRecord message, Runnable callback, HttpResponseConfig config, MessageProducer producer) { super(context, new HttpMessageRequest(context, message), null, config); this.message = message; + this.callback = callback; this.producer = producer; } public void finishHttpResult(HttpResult result) { - finishHttpResult(this.message, this.producer, message.getResptopic(), result); + finishHttpResult(this.message, this.callback, this.producer, message.getResptopic(), result); } - public static void finishHttpResult(MessageRecord msg, MessageProducer producer, String resptopic, HttpResult result) { + public static void finishHttpResult(MessageRecord msg, Runnable callback, MessageProducer producer, String resptopic, HttpResult result) { + if(callback!=null) callback.run(); if (resptopic == null || resptopic.isEmpty()) return; ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType(); byte[] content = HttpResultCoder.getInstance().encode(result); diff --git a/src/org/redkale/mq/HttpRespProcessor.java b/src/org/redkale/mq/HttpRespProcessor.java deleted file mode 100644 index d7067d65c..000000000 --- a/src/org/redkale/mq/HttpRespProcessor.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import java.util.concurrent.*; -import java.util.logging.*; - -/** - * MQ管理器 - * - * - * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class HttpRespProcessor implements MessageProcessor { - - protected final Logger logger; - - protected final MessageAgent messageAgent; - - protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); - - public HttpRespProcessor(Logger logger, MessageAgent messageAgent) { - this.logger = logger; - this.messageAgent = messageAgent; - } - - @Override - public void process(MessageRecord message) { - RespFutureNode node = respNodes.get(message.getSeqid()); - if (node == null) { - logger.log(Level.WARNING, HttpRespProcessor.class.getSimpleName() + " process " + message + " error"); - return; - } - node.future.complete(message); - } - - public CompletableFuture createFuture(long seqid) { - CompletableFuture future = new CompletableFuture<>(); - RespFutureNode node = new RespFutureNode(seqid, future); - respNodes.put(seqid, node); - return future; - } - - protected static class RespFutureNode { - - public final long seqid; - - public final long createtime; - - public final CompletableFuture future; - - public RespFutureNode(long seqid, CompletableFuture future) { - this.seqid = seqid; - this.future = future; - this.createtime = System.currentTimeMillis(); - } - - } -} diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index caeca6337..d1df298f3 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -65,7 +65,7 @@ public abstract class MessageClient { synchronized (this) { if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic; if (this.consumer == null) { - MessageProcessor processor = msg -> { + MessageProcessor processor = (msg,callback) -> { MessageRespFutureNode node = respNodes.get(msg.getSeqid()); if (node == null) { messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error"); diff --git a/src/org/redkale/mq/MessageMultiThreadProcessor.java b/src/org/redkale/mq/MessageMultiThreadProcessor.java new file mode 100644 index 000000000..a59b8a38b --- /dev/null +++ b/src/org/redkale/mq/MessageMultiThreadProcessor.java @@ -0,0 +1,38 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.concurrent.CountDownLatch; + +/** + * + * @author zhangjx + */ +public class MessageMultiThreadProcessor implements MessageProcessor { + + protected final MessageAgent messageAgent; + + protected final MessageProcessor processor; + + protected CountDownLatch cdl; + + public MessageMultiThreadProcessor(MessageAgent messageAgent, MessageProcessor processor) { + this.messageAgent = messageAgent; + this.processor = processor; + } + + @Override + public void begin(int size) { + } + + @Override + public void process(MessageRecord message, Runnable callback) { + } + + @Override + public void commit() { + } +} diff --git a/src/org/redkale/mq/MessageProcessor.java b/src/org/redkale/mq/MessageProcessor.java index 31fa1e700..d8d024759 100644 --- a/src/org/redkale/mq/MessageProcessor.java +++ b/src/org/redkale/mq/MessageProcessor.java @@ -16,5 +16,11 @@ package org.redkale.mq; */ public interface MessageProcessor { - public void process(MessageRecord message); + default void begin(int size) { + } + + public void process(MessageRecord message, Runnable callback); + + default void commit() { + } } diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java index 74c9afc07..3e24736e7 100644 --- a/src/org/redkale/mq/SncpMessageProcessor.java +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -40,10 +40,10 @@ public class SncpMessageProcessor implements MessageProcessor { } @Override - public void process(MessageRecord message) { + public void process(MessageRecord message, Runnable callback) { SncpContext context = server.getSncpServer().getContext(); SncpMessageRequest request = new SncpMessageRequest(context, message); - SncpMessageResponse response = new SncpMessageResponse(context, request, null, producer); + SncpMessageResponse response = new SncpMessageResponse(context, request, callback, null, producer); try { servlet.execute(request, response); } catch (Exception ex) { diff --git a/src/org/redkale/mq/SncpMessageResponse.java b/src/org/redkale/mq/SncpMessageResponse.java index 485209510..39671f944 100644 --- a/src/org/redkale/mq/SncpMessageResponse.java +++ b/src/org/redkale/mq/SncpMessageResponse.java @@ -28,20 +28,25 @@ public class SncpMessageResponse extends SncpResponse { protected MessageProducer producer; - public SncpMessageResponse(SncpContext context, SncpMessageRequest request, ObjectPool responsePool, MessageProducer producer) { + protected Runnable callback; + + public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, ObjectPool responsePool, MessageProducer producer) { super(context, request, responsePool); this.message = request.message; + this.callback = callback; this.producer = producer; } - public SncpMessageResponse(SncpContext context, MessageRecord message, ObjectPool responsePool, MessageProducer producer) { + public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, ObjectPool responsePool, MessageProducer producer) { super(context, new SncpMessageRequest(context, message), responsePool); this.message = message; + this.callback = callback; this.producer = producer; } @Override public void finish(final int retcode, final BsonWriter out) { + if (callback != null) callback.run(); if (out == null) { final byte[] result = new byte[SncpRequest.HEADER_SIZE]; fillHeader(ByteBuffer.wrap(result), 0, retcode);