diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index b18ec90..ae39551 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -1,10 +1,12 @@ package com.zdemo; import org.redkale.convert.json.JsonConvert; +import org.redkale.util.TypeToken; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; /** * @author Liang @@ -12,29 +14,15 @@ import java.util.Set; */ public abstract class AbstractConsumer implements IConsumer { - public final Map eventMap = new HashMap<>(); + private Map eventMap = new HashMap<>(); - public abstract String getGroupid(); + protected abstract String getGroupid(); - public boolean preInit() { + protected boolean preInit() { return true; } - @Deprecated - public void addEventType(EventType... eventType) { - for (EventType type : eventType) { - String[] topics = type.topic.split(","); - for (String topic : topics) { - if (topic.isEmpty()) { - continue; - } - eventMap.put(topic, type); - } - } - } - - @Override - public final Set getTopics() { + protected final Set getTopics() { if (!eventMap.isEmpty()) { return eventMap.keySet(); } @@ -42,8 +30,7 @@ public abstract class AbstractConsumer implements IConsumer { return Set.of("-"); } - @Override - public final void accept(String topic, String value) { + protected void accept(String topic, String value) { EventType eventType = eventMap.get(topic); Object data = null; @@ -56,5 +43,31 @@ public abstract class AbstractConsumer implements IConsumer { eventType.accept(data); } + protected final void removeEventType(String topic) { + eventMap.remove(topic); + } + + /** + * 不同组件的订阅事件 发送 + * + * @param topic + */ + protected abstract void subscribe(String topic); + + public void subscribe(String topic, Consumer consumer) { + subscribe(topic, TYPE_TOKEN_STRING, consumer); + } + + @Override + public void subscribe(String topic, TypeToken typeToken, Consumer consumer) { + if (topic.contains(",")) { + for (String x : topic.split(",")) { + subscribe(x, typeToken, consumer); + } + } + + eventMap.put(topic, EventType.of(topic, typeToken, consumer)); + subscribe(topic); + } } diff --git a/src/com/zdemo/IConsumer.java b/src/com/zdemo/IConsumer.java index 011f4e7..4913e73 100644 --- a/src/com/zdemo/IConsumer.java +++ b/src/com/zdemo/IConsumer.java @@ -2,7 +2,6 @@ package com.zdemo; import org.redkale.util.TypeToken; -import java.util.Collection; import java.util.function.Consumer; public interface IConsumer { @@ -11,12 +10,6 @@ public interface IConsumer { TypeToken TYPE_TOKEN_INT = new TypeToken() { }; - Collection getTopics(); - - void addEventType(EventType... eventType); - - void accept(String topic, String record); - /** * 取消订阅 * @@ -24,7 +17,21 @@ public interface IConsumer { */ void unsubscribe(String topic); + /** + * 订阅, 接收数据类型 String + * + * @param topic + * @param consumer + */ void subscribe(String topic, Consumer consumer); - + + /** + * 订阅,接收类型为 + * + * @param topic + * @param typeToken + * @param consumer + * @param + */ void subscribe(String topic, TypeToken typeToken, Consumer consumer); } diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index ee0e85f..ef1dac5 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -1,7 +1,6 @@ package com.zdemo.kafak; import com.zdemo.AbstractConsumer; -import com.zdemo.EventType; import com.zdemo.IConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -9,7 +8,6 @@ import org.apache.kafka.common.errors.WakeupException; import org.redkale.net.http.RestService; import org.redkale.service.Service; import org.redkale.util.AnyValue; -import org.redkale.util.TypeToken; import javax.annotation.Resource; import java.io.File; @@ -18,7 +16,6 @@ import java.io.IOException; import java.time.Duration; import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,24 +30,17 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume @Resource(name = "APP_HOME") protected File APP_HOME; - protected Properties props; - public abstract String getGroupid(); private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); - @Override - public void addEventType(EventType... eventTypes) { - super.addEventType(eventTypes); - } - @Override public final void init(AnyValue config) { if (!preInit()) { return; } try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { - props = new Properties(); + Properties props = new Properties(); props.load(fis); new Thread(() -> { @@ -93,16 +83,13 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume @Override public void unsubscribe(String topic) { - queue.add(() -> eventMap.remove(topic)); // 加入延时执行队列(下一次订阅变更检查周期执行) + queue.add(() -> super.removeEventType(topic)); // 加入延时执行队列(下一次订阅变更检查周期执行) } @Override - public void subscribe(String topic, Consumer consumer) { - addEventType(EventType.of(topic, consumer)); - } - - @Override - public void subscribe(String topic, TypeToken typeToken, Consumer consumer) { - addEventType(EventType.of(topic, typeToken, consumer)); + protected void subscribe(String topic) { + queue.add(() -> { + // just set flag, nothing to do + }); } } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index 770c29d..6b3ba45 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -1,6 +1,5 @@ package com.zdemo.kafak; -import com.zdemo.Event; import com.zdemo.IProducer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -40,16 +39,6 @@ public class KafakProducer implements IProducer, Service { } } - /*@Deprecated - @Override - public void send(T t) { - String v = JsonConvert.root().convertTo(t.value); - if (v.startsWith("\"") && v.endsWith("\"")) { - v = v.substring(1, v.length() - 1); - } - producer.send(new ProducerRecord(t.topic, v)); - }*/ - @Override public void publish(String topic, V v) { producer.send(new ProducerRecord(topic, toStr(v))); diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 1672973..676281c 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -1,11 +1,9 @@ package com.zdemo.redis; import com.zdemo.AbstractConsumer; -import com.zdemo.EventType; import com.zdemo.IConsumer; import org.redkale.service.Service; import org.redkale.util.AnyValue; -import org.redkale.util.TypeToken; import javax.annotation.Resource; import java.io.BufferedReader; @@ -14,7 +12,6 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -87,31 +84,10 @@ public class RedisConsumer extends AbstractConsumer implements IConsumer, Servic } @Override - public String getGroupid() { + protected String getGroupid() { return null; } - @Override - public void addEventType(EventType... eventType) { - for (EventType type : eventType) { - String[] topics = type.topic.split(","); - for (String topic : topics) { - if (topic.isEmpty()) { - continue; - } - eventMap.put(topic, type); - - //新增订阅 - try { - writer.write("SUBSCRIBE " + topic + "\r\n"); - writer.flush(); - } catch (IOException e) { - logger.log(Level.WARNING, "", e); - } - } - } - } - @Override public void unsubscribe(String topic) { try { @@ -120,15 +96,17 @@ public class RedisConsumer extends AbstractConsumer implements IConsumer, Servic } catch (IOException e) { logger.log(Level.WARNING, "", e); } + super.removeEventType(topic); } @Override - public void subscribe(String topic, Consumer consumer) { - addEventType(EventType.of(topic, consumer)); - } - - @Override - public void subscribe(String topic, TypeToken typeToken, Consumer consumer) { - addEventType(EventType.of(topic, typeToken, consumer)); + protected void subscribe(String topic) { + //新增订阅 + try { + writer.write("SUBSCRIBE " + topic + "\r\n"); + writer.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + } } } diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java index 04bc7e1..ef5a4d3 100644 --- a/src/com/zdemo/redis/RedisProducer.java +++ b/src/com/zdemo/redis/RedisProducer.java @@ -1,6 +1,5 @@ package com.zdemo.redis; -import com.zdemo.Event; import com.zdemo.IProducer; import org.redkale.convert.json.JsonConvert; import org.redkale.service.Service; diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index 5ef3446..483c277 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -1,10 +1,12 @@ package com.zdemo.zhub; -import com.zdemo.*; +import com.zdemo.AbstractConsumer; +import com.zdemo.Event; +import com.zdemo.IConsumer; +import com.zdemo.IProducer; import org.redkale.convert.json.JsonConvert; import org.redkale.service.Service; import org.redkale.util.AnyValue; -import org.redkale.util.TypeToken; import javax.annotation.Resource; import java.io.BufferedReader; @@ -18,13 +20,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; public abstract class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service { - Logger logger = Logger.getLogger(IProducer.class.getSimpleName()); + Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName()); @Resource(name = "property.zhub.host") private String host = "127.0.0.1"; @@ -134,7 +135,7 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, logger.log(Level.WARNING, "timer [" + timer.name + "]", e); } } - }, 2); + }, 1); threadBuilder.accept(() -> { while (true) { @@ -184,7 +185,7 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, return JsonConvert.root().convertTo(v); } - public boolean initSocket() { + protected boolean initSocket() { try { client = new Socket(); client.connect(new InetSocketAddress(host, port)); @@ -213,62 +214,12 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, return true; } - @Deprecated - @Override - public void addEventType(EventType... eventType) { - for (EventType type : eventType) { - String[] topics = type.topic.split(","); - for (String topic : topics) { - if (topic.isEmpty()) { - continue; - } - eventMap.put(topic, type); - - //新增订阅 - send("subscribe " + topic); - } - } - } - @Override public void unsubscribe(String topic) { send("unsubscribe " + topic); + super.removeEventType(topic); } - // timer - private ConcurrentHashMap timerMap = new ConcurrentHashMap(); - - public void timer(String name, Runnable run) { - timerMap.put(name, new Timer(name, run)); - send("timer", name); - } - - public void reloadTimer() { - send("cmd", "reload-timer-config"); - } - - class Timer { - String name; - //String expr; - Runnable runnable; - //boolean single; - - public String getName() { - return name; - } - - - public Runnable getRunnable() { - return runnable; - } - - public Timer(String name, Runnable runnable) { - this.name = name; - this.runnable = runnable; - } - } - - public void publish(String topic, V v) { send("publish", topic, toStr(v)); } @@ -316,12 +267,41 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, } @Override - public void subscribe(String topic, Consumer consumer) { - addEventType(EventType.of(topic, consumer)); + protected void subscribe(String topic) { + send("subscribe " + topic); //新增订阅 } - @Override - public void subscribe(String topic, TypeToken typeToken, Consumer consumer) { - addEventType(EventType.of(topic, typeToken, consumer)); + + // ================================================== timer ================================================== + private ConcurrentHashMap timerMap = new ConcurrentHashMap(); + + class Timer { + String name; + //String expr; + Runnable runnable; + //boolean single; + + public String getName() { + return name; + } + + + public Runnable getRunnable() { + return runnable; + } + + public Timer(String name, Runnable runnable) { + this.name = name; + this.runnable = runnable; + } + } + + public void timer(String name, Runnable run) { + timerMap.put(name, new Timer(name, run)); + send("timer", name); + } + + public void reloadTimer() { + send("cmd", "reload-timer-config"); } }