This commit is contained in:
Redkale
2020-05-29 21:04:00 +08:00
parent ed1fb151d7
commit 5de580c00b
6 changed files with 125 additions and 6 deletions

View File

@@ -6,6 +6,7 @@
package org.redkale.mq;
import java.util.*;
import java.util.function.Function;
import java.util.logging.Logger;
import org.redkale.boot.*;
import org.redkale.net.http.Rest;
@@ -66,6 +67,15 @@ public abstract class MessageAgent {
//查询所有topic
public abstract List<String> queryTopic();
//创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor);
//创建指定topic的生产处理器
public abstract MessageProducer createProducer();
//创建指定topic的流处理器
public abstract MessageStreams createStreams(String topic, Function<MessageRecord, MessageRecord> processor);
//格式: sncp:req:user
protected static String generateSncpReqTopic(NodeServer ns, Service service) {
String resname = Sncp.getResourceName(service);

View File

@@ -0,0 +1,48 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.Objects;
import java.util.logging.Logger;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public abstract class MessageConsumer extends Thread {
protected final String topic;
protected final MessageProcessor processor;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected volatile boolean closed;
protected MessageConsumer(String topic, MessageProcessor processor) {
Objects.requireNonNull(topic);
Objects.requireNonNull(processor);
this.topic = topic;
this.processor = processor;
}
public MessageProcessor getProcessor() {
return processor;
}
public String getTopic() {
return topic;
}
public boolean isClosed() {
return closed;
}
public abstract void close();
}

View File

@@ -6,7 +6,6 @@
package org.redkale.mq;
/**
*
*
* <p>
* 详情见: https://redkale.org
@@ -15,5 +14,5 @@ package org.redkale.mq;
*/
public interface MessageProcessor {
public void process(MessageRecord req, MessageResponse response);
public void process(MessageRecord message);
}

View File

@@ -0,0 +1,31 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public abstract class MessageProducer extends Thread {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected volatile boolean closed;
public abstract CompletableFuture apply(MessageRecord message);
public boolean isClosed() {
return closed;
}
public abstract void close();
}

View File

@@ -6,7 +6,6 @@
package org.redkale.mq;
/**
*
*
* <p>
* 详情见: https://redkale.org
@@ -15,5 +14,5 @@ package org.redkale.mq;
*/
public interface MessageResponse {
public void finish(MessageRecord result);
public void finish(MessageRecord message);
}

View File

@@ -5,6 +5,10 @@
*/
package org.redkale.mq;
import java.util.Objects;
import java.util.function.Function;
import java.util.logging.Logger;
/**
*
* <p>
@@ -12,6 +16,34 @@ package org.redkale.mq;
*
* @author zhangjx
*/
public interface MessageStreams {
public abstract class MessageStreams extends Thread {
protected final String topic;
protected final Function<MessageRecord, MessageRecord> processor;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected volatile boolean closed;
protected MessageStreams(String topic, Function<MessageRecord, MessageRecord> processor) {
Objects.requireNonNull(topic);
Objects.requireNonNull(processor);
this.topic = topic;
this.processor = processor;
}
public Function<MessageRecord, MessageRecord> getProcessor() {
return processor;
}
public String getTopic() {
return topic;
}
public boolean isClosed() {
return closed;
}
public abstract void close();
}