From 9138790fd2c1a9b1100965a9fb630b0b1b27d923 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Fri, 13 Nov 2020 18:23:23 +0800 Subject: [PATCH] . --- src/com/zdemo/AbstractConsumer.java | 12 +++++++----- src/com/zdemo/IConsumer.java | 6 ++++-- src/com/zdemo/kafak/KafakConsumer.java | 6 ++---- src/com/zdemo/pulsar/PulsarConsumer.java | 7 +++++-- src/com/zdemo/redis/RedisConsumer.java | 4 ++-- test/com/zdemo/test/AppTest.java | 3 ++- 6 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index ac338ca..3414a56 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -2,9 +2,7 @@ package com.zdemo; import org.redkale.convert.json.JsonConvert; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * @author Liang @@ -33,8 +31,12 @@ public abstract class AbstractConsumer implements IConsumer { } @Override - public final Set getSubscribes() { - return eventMap.keySet(); + public final Set getTopics() { + Set keySet = eventMap.keySet(); + if (keySet.isEmpty()) { + keySet.add("-"); + } + return keySet; } @Override diff --git a/src/com/zdemo/IConsumer.java b/src/com/zdemo/IConsumer.java index 48051be..cf52a84 100644 --- a/src/com/zdemo/IConsumer.java +++ b/src/com/zdemo/IConsumer.java @@ -13,7 +13,9 @@ public interface IConsumer { Logger logger = Logger.getLogger(IConsumer.class.getSimpleName()); - Collection getSubscribes(); + Collection getTopics(); - void accept(String topic, String record); + void addEventType(EventType... eventType); + + void accept(String topic, String record); } diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index e6a8227..487ce03 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -19,8 +19,6 @@ import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; -import static java.util.Arrays.asList; - /** * 消费 */ @@ -66,7 +64,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume try { props.put("group.id", getGroupid()); KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(asList("_")); + consumer.subscribe(getTopics()); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1_000)); records.forEach(record -> { @@ -84,7 +82,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume while (!queue.isEmpty()) { queue.clear(); consumer.unsubscribe(); - consumer.subscribe(getSubscribes()); + consumer.subscribe(getTopics()); } } } catch (WakeupException ex) { diff --git a/src/com/zdemo/pulsar/PulsarConsumer.java b/src/com/zdemo/pulsar/PulsarConsumer.java index abfcfa6..1aef468 100644 --- a/src/com/zdemo/pulsar/PulsarConsumer.java +++ b/src/com/zdemo/pulsar/PulsarConsumer.java @@ -38,6 +38,9 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum @Override public void init(AnyValue config) { + if (!preInit()) { + return; + } new Thread(() -> { try { client = PulsarClient.builder() @@ -45,7 +48,7 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum .build(); consumer = client.newConsumer() - .topics(new ArrayList<>(getSubscribes())) + .topics(new ArrayList<>(getTopics())) .subscriptionName(getGroupid()) .subscriptionType(SubscriptionType.Shared) .subscribe(); @@ -56,7 +59,7 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum queue.clear(); consumer.unsubscribe(); consumer = client.newConsumer() - .topics(new ArrayList<>(getSubscribes())) + .topics(new ArrayList<>(getTopics())) .subscriptionName(getGroupid()) .subscriptionType(SubscriptionType.Shared) .subscribe(); diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 0b25e8f..bbaae9c 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -40,10 +40,10 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume writer.flush(); StringBuffer buf = new StringBuffer("SUBSCRIBE"); - for (String topic : getSubscribes()) { + for (String topic : getTopics()) { buf.append(" ").append(topic); } - buf.append(" _\r\n"); + buf.append("\r\n"); writer.write(buf.toString()); writer.flush(); diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 8a081bc..71195fc 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -2,6 +2,7 @@ package com.zdemo.test; import com.zdemo.Event; import com.zdemo.EventType; +import com.zdemo.IConsumer; import com.zdemo.IProducer; import com.zdemo.pulsar.PulsarProducer; import org.junit.Test; @@ -23,7 +24,7 @@ public class AppTest { public void runConsumer() { try { //启动并开启消费监听 - MyConsumer consumer = Application.singleton(MyConsumer.class); + IConsumer consumer = Application.singleton(MyConsumer.class); consumer.addEventType( EventType.of("a1", new TypeToken() {