This commit is contained in:
Redkale
2020-05-29 22:34:53 +08:00
parent 5de580c00b
commit c34a6d8f49
2 changed files with 6 additions and 5 deletions

View File

@@ -6,7 +6,7 @@
package org.redkale.mq; package org.redkale.mq;
import java.util.*; import java.util.*;
import java.util.function.Function; import java.util.function.*;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.redkale.boot.*; import org.redkale.boot.*;
import org.redkale.net.http.Rest; import org.redkale.net.http.Rest;
@@ -68,7 +68,7 @@ public abstract class MessageAgent {
public abstract List<String> queryTopic(); public abstract List<String> queryTopic();
//创建指定topic的消费处理器 //创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor); public abstract MessageConsumer createConsumer(String topic, Consumer<MessageRecord> processor);
//创建指定topic的生产处理器 //创建指定topic的生产处理器
public abstract MessageProducer createProducer(); public abstract MessageProducer createProducer();

View File

@@ -6,6 +6,7 @@
package org.redkale.mq; package org.redkale.mq;
import java.util.Objects; import java.util.Objects;
import java.util.function.Consumer;
import java.util.logging.Logger; import java.util.logging.Logger;
/** /**
@@ -19,20 +20,20 @@ public abstract class MessageConsumer extends Thread {
protected final String topic; protected final String topic;
protected final MessageProcessor processor; protected final Consumer<MessageRecord> processor;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected volatile boolean closed; protected volatile boolean closed;
protected MessageConsumer(String topic, MessageProcessor processor) { protected MessageConsumer(String topic, Consumer<MessageRecord> processor) {
Objects.requireNonNull(topic); Objects.requireNonNull(topic);
Objects.requireNonNull(processor); Objects.requireNonNull(processor);
this.topic = topic; this.topic = topic;
this.processor = processor; this.processor = processor;
} }
public MessageProcessor getProcessor() { public Consumer<MessageRecord> getProcessor() {
return processor; return processor;
} }