diff --git a/src/org/redkale/mq/HttpRespProcessor.java b/src/org/redkale/mq/HttpRespProcessor.java new file mode 100644 index 000000000..a6f5f97af --- /dev/null +++ b/src/org/redkale/mq/HttpRespProcessor.java @@ -0,0 +1,66 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.concurrent.*; +import java.util.logging.*; + +/** + * MQ管理器 + * + * + * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class HttpRespProcessor implements MessageProcessor { + + protected final Logger logger; + + protected final MessageAgent agent; + + protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); + + public HttpRespProcessor(Logger logger, MessageAgent agent) { + this.logger = logger; + this.agent = agent; + } + + @Override + public void process(MessageRecord message) { + RespFutureNode node = respNodes.get(message.getSeqid()); + if (node == null) { + logger.log(Level.WARNING, HttpRespProcessor.class.getSimpleName() + " process " + message + " error"); + return; + } + node.future.complete(message); + } + + public CompletableFuture createFuture(long seqid) { + CompletableFuture future = new CompletableFuture<>(); + RespFutureNode node = new RespFutureNode(seqid, future); + respNodes.put(seqid, node); + return future; + } + + protected static class RespFutureNode { + + public final long seqid; + + public final long createtime; + + public final CompletableFuture future; + + public RespFutureNode(long seqid, CompletableFuture future) { + this.seqid = seqid; + this.future = future; + this.createtime = System.currentTimeMillis(); + } + + } +} diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 99617510b..c031780d6 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -54,6 +54,10 @@ public abstract class MessageAgent { public void init(AnyValue config) { } + public final CompletableFuture createSncpRespFuture(MessageRecord message) { + return this.sncpRespProcessor.createFuture(message.getSeqid()); + } + public final synchronized void startSncpRespConsumer() { if (this.sncpRespStartms >= 0) return; long s = System.currentTimeMillis(); diff --git a/src/org/redkale/mq/MessageProducer.java b/src/org/redkale/mq/MessageProducer.java index 0bedec17a..c1024752b 100644 --- a/src/org/redkale/mq/MessageProducer.java +++ b/src/org/redkale/mq/MessageProducer.java @@ -23,7 +23,7 @@ public abstract class MessageProducer extends Thread { protected volatile boolean closed; - public abstract CompletableFuture apply(MessageRecord message); + public abstract CompletableFuture apply(MessageRecord message); protected abstract void waitFor(); diff --git a/src/org/redkale/mq/SncpMessageResponse.java b/src/org/redkale/mq/SncpMessageResponse.java index 759525dad..485209510 100644 --- a/src/org/redkale/mq/SncpMessageResponse.java +++ b/src/org/redkale/mq/SncpMessageResponse.java @@ -45,12 +45,12 @@ public class SncpMessageResponse extends SncpResponse { if (out == null) { final byte[] result = new byte[SncpRequest.HEADER_SIZE]; fillHeader(ByteBuffer.wrap(result), 0, retcode); - producer.apply(new MessageRecord(ConvertType.BSON, message.getResptopic(), null, (byte[]) null)); + producer.apply(new MessageRecord(message.getSeqid(), ConvertType.BSON, message.getResptopic(), null, (byte[]) null)); return; } final int respBodyLength = out.count(); //body总长度 final byte[] result = out.toArray(); fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode); - producer.apply(new MessageRecord(ConvertType.BSON, message.getResptopic(), null, result)); + producer.apply(new MessageRecord(message.getSeqid(), ConvertType.BSON, message.getResptopic(), null, result)); } } diff --git a/src/org/redkale/mq/SncpRespConsumer.java b/src/org/redkale/mq/SncpRespConsumer.java deleted file mode 100644 index 80ce97e72..000000000 --- a/src/org/redkale/mq/SncpRespConsumer.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class SncpRespConsumer { - -} diff --git a/src/org/redkale/mq/SncpRespProcessor.java b/src/org/redkale/mq/SncpRespProcessor.java index 0d3cfe277..20334fd70 100644 --- a/src/org/redkale/mq/SncpRespProcessor.java +++ b/src/org/redkale/mq/SncpRespProcessor.java @@ -5,7 +5,8 @@ */ package org.redkale.mq; -import java.util.logging.Logger; +import java.util.concurrent.*; +import java.util.logging.*; /** * MQ管理器 @@ -23,6 +24,8 @@ public class SncpRespProcessor implements MessageProcessor { protected final MessageAgent agent; + protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); + public SncpRespProcessor(Logger logger, MessageAgent agent) { this.logger = logger; this.agent = agent; @@ -30,5 +33,34 @@ public class SncpRespProcessor implements MessageProcessor { @Override public void process(MessageRecord message) { + RespFutureNode node = respNodes.get(message.getSeqid()); + if (node == null) { + logger.log(Level.WARNING, SncpRespProcessor.class.getSimpleName() + " process " + message + " error"); + return; + } + node.future.complete(message); + } + + public CompletableFuture createFuture(long seqid) { + CompletableFuture future = new CompletableFuture<>(); + RespFutureNode node = new RespFutureNode(seqid, future); + respNodes.put(seqid, node); + return future; + } + + protected static class RespFutureNode { + + public final long seqid; + + public final long createtime; + + public final CompletableFuture future; + + public RespFutureNode(long seqid, CompletableFuture future) { + this.seqid = seqid; + this.future = future; + this.createtime = System.currentTimeMillis(); + } + } }