From a37f1b95640af9e5d2e38391e99b89a6c07d2fbf Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Tue, 8 Dec 2020 19:31:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=E9=80=80=E8=AE=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/com/zdemo/IConsumer.java | 6 +++++ src/com/zdemo/kafak/KafakConsumer.java | 28 +++++++++++------------ src/com/zdemo/pulsar/PulsarConsumer.java | 29 ++++++++++-------------- src/com/zdemo/redis/RedisConsumer.java | 10 ++++++++ 4 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/com/zdemo/IConsumer.java b/src/com/zdemo/IConsumer.java index cf52a84..4b7b0a7 100644 --- a/src/com/zdemo/IConsumer.java +++ b/src/com/zdemo/IConsumer.java @@ -18,4 +18,10 @@ public interface IConsumer { void addEventType(EventType... eventType); void accept(String topic, String record); + + /** + * 取消订阅 + * @param topic + */ + void unsubscribe(String topic); } diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index 487ce03..b2e51ca 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -35,18 +35,14 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume public abstract String getGroupid(); - private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); @Override public void addEventType(EventType... eventTypes) { super.addEventType(eventTypes); - try { - for (EventType eventType : eventTypes) { - queue.put(eventType); - } - } catch (InterruptedException e) { - logger.log(Level.WARNING, "", e); - } + + // 增加变更标记 + queue.add(() -> logger.info("KafakConsumer starting...")); } @Override @@ -58,8 +54,6 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume props = new Properties(); props.load(fis); - if (logger.isLoggable(Level.INFO)) logger.info(getGroupid() + " consumer started!"); - new Thread(() -> { try { props.put("group.id", getGroupid()); @@ -78,9 +72,12 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume } }); - // 动态新增订阅 - while (!queue.isEmpty()) { - queue.clear(); + if (!queue.isEmpty()) { + Runnable runnable; + while ((runnable = queue.poll()) != null) { + runnable.run(); + } + consumer.unsubscribe(); consumer.subscribe(getTopics()); } @@ -95,5 +92,8 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume } } - + @Override + public void unsubscribe(String topic) { + queue.add(() -> eventMap.remove(topic)); // 加入延时执行队列(下一次订阅变更检查周期执行) + } } diff --git a/src/com/zdemo/pulsar/PulsarConsumer.java b/src/com/zdemo/pulsar/PulsarConsumer.java index 1aef468..3008e08 100644 --- a/src/com/zdemo/pulsar/PulsarConsumer.java +++ b/src/com/zdemo/pulsar/PulsarConsumer.java @@ -22,18 +22,14 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum public abstract String getGroupid(); - private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); @Override public void addEventType(EventType... eventTypes) { super.addEventType(eventTypes); - try { - for (EventType eventType : eventTypes) { - queue.put(eventType); - } - } catch (InterruptedException e) { - logger.log(Level.WARNING, "", e); - } + + // 增加变更标记 + queue.add(() -> logger.info("PulsarConsumer add new topic!")); } @Override @@ -41,22 +37,21 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum if (!preInit()) { return; } + queue.add(() -> logger.info("PulsarConsumer starting ...")); new Thread(() -> { try { client = PulsarClient.builder() .serviceUrl(serviceurl) .build(); - consumer = client.newConsumer() - .topics(new ArrayList<>(getTopics())) - .subscriptionName(getGroupid()) - .subscriptionType(SubscriptionType.Shared) - .subscribe(); - while (true) { // 动态新增订阅 - while (!queue.isEmpty()) { - queue.clear(); + if (!queue.isEmpty()) { + Runnable runnable; + while ((runnable = queue.poll()) != null) { + runnable.run(); + } + consumer.unsubscribe(); consumer = client.newConsumer() .topics(new ArrayList<>(getTopics())) @@ -90,7 +85,7 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum } @Override - public void destroy(AnyValue config) { + public void unsubscribe(String topic) { } } diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index bbaae9c..25cec29 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -101,4 +101,14 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume } } } + + @Override + public void unsubscribe(String topic) { + try { + writer.write("UNSUBSCRIBE " + topic + "\r\n"); + writer.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + } + } }