diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index a25015616..93cf41f26 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -6,6 +6,7 @@ package org.redkale.mq; import java.util.*; +import java.util.function.Function; import java.util.logging.Logger; import org.redkale.boot.*; import org.redkale.net.http.Rest; @@ -66,6 +67,15 @@ public abstract class MessageAgent { //查询所有topic public abstract List queryTopic(); + //创建指定topic的消费处理器 + public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor); + + //创建指定topic的生产处理器 + public abstract MessageProducer createProducer(); + + //创建指定topic的流处理器 + public abstract MessageStreams createStreams(String topic, Function processor); + //格式: sncp:req:user protected static String generateSncpReqTopic(NodeServer ns, Service service) { String resname = Sncp.getResourceName(service); diff --git a/src/org/redkale/mq/MessageConsumer.java b/src/org/redkale/mq/MessageConsumer.java new file mode 100644 index 000000000..74261993b --- /dev/null +++ b/src/org/redkale/mq/MessageConsumer.java @@ -0,0 +1,48 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.Objects; +import java.util.logging.Logger; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public abstract class MessageConsumer extends Thread { + + protected final String topic; + + protected final MessageProcessor processor; + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected volatile boolean closed; + + protected MessageConsumer(String topic, MessageProcessor processor) { + Objects.requireNonNull(topic); + Objects.requireNonNull(processor); + this.topic = topic; + this.processor = processor; + } + + public MessageProcessor getProcessor() { + return processor; + } + + public String getTopic() { + return topic; + } + + public boolean isClosed() { + return closed; + } + + public abstract void close(); +} diff --git a/src/org/redkale/mq/MessageProcessor.java b/src/org/redkale/mq/MessageProcessor.java index 2c89bbe6d..eea0f473f 100644 --- a/src/org/redkale/mq/MessageProcessor.java +++ b/src/org/redkale/mq/MessageProcessor.java @@ -6,7 +6,6 @@ package org.redkale.mq; /** - * * *

* 详情见: https://redkale.org @@ -15,5 +14,5 @@ package org.redkale.mq; */ public interface MessageProcessor { - public void process(MessageRecord req, MessageResponse response); + public void process(MessageRecord message); } diff --git a/src/org/redkale/mq/MessageProducer.java b/src/org/redkale/mq/MessageProducer.java new file mode 100644 index 000000000..cce361f09 --- /dev/null +++ b/src/org/redkale/mq/MessageProducer.java @@ -0,0 +1,31 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.concurrent.CompletableFuture; +import java.util.logging.Logger; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public abstract class MessageProducer extends Thread { + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected volatile boolean closed; + + public abstract CompletableFuture apply(MessageRecord message); + + public boolean isClosed() { + return closed; + } + + public abstract void close(); +} diff --git a/src/org/redkale/mq/MessageResponse.java b/src/org/redkale/mq/MessageResponse.java index 67fbfd81b..237080320 100644 --- a/src/org/redkale/mq/MessageResponse.java +++ b/src/org/redkale/mq/MessageResponse.java @@ -6,7 +6,6 @@ package org.redkale.mq; /** - * * *

* 详情见: https://redkale.org @@ -15,5 +14,5 @@ package org.redkale.mq; */ public interface MessageResponse { - public void finish(MessageRecord result); + public void finish(MessageRecord message); } diff --git a/src/org/redkale/mq/MessageStreams.java b/src/org/redkale/mq/MessageStreams.java index 04e3354b7..3206a3893 100644 --- a/src/org/redkale/mq/MessageStreams.java +++ b/src/org/redkale/mq/MessageStreams.java @@ -5,6 +5,10 @@ */ package org.redkale.mq; +import java.util.Objects; +import java.util.function.Function; +import java.util.logging.Logger; + /** * *

@@ -12,6 +16,34 @@ package org.redkale.mq; * * @author zhangjx */ -public interface MessageStreams { - +public abstract class MessageStreams extends Thread { + + protected final String topic; + + protected final Function processor; + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected volatile boolean closed; + + protected MessageStreams(String topic, Function processor) { + Objects.requireNonNull(topic); + Objects.requireNonNull(processor); + this.topic = topic; + this.processor = processor; + } + + public Function getProcessor() { + return processor; + } + + public String getTopic() { + return topic; + } + + public boolean isClosed() { + return closed; + } + + public abstract void close(); }