diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java new file mode 100644 index 0000000..6fe52e2 --- /dev/null +++ b/src/com/zdemo/AbstractConsumer.java @@ -0,0 +1,43 @@ +package com.zdemo; + +import org.redkale.convert.json.JsonConvert; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * @author Liang + * @data 2020-09-05 23:18 + */ +public abstract class AbstractConsumer implements IConsumer { + + public final Map eventMap = new HashMap<>(); + + public void addEventType(EventType... eventType) { + for (EventType type : eventType) { + eventMap.put(type.topic, type); + } + } + + @Override + public final Collection getSubscribes() { + return eventMap.keySet(); + } + + @Override + public final void accept(String topic, String value) { + EventType eventType = eventMap.get(topic); + + Object data = null; + if ("java.lang.String".equals(eventType.typeToken.getType().toString())) { + data = value; + } else { + data = JsonConvert.root().convertFrom(eventType.typeToken.getType(), value); + } + + eventType.accept(data); + } + + +} diff --git a/src/com/zdemo/Event.java b/src/com/zdemo/Event.java index cffb45d..ce5318b 100644 --- a/src/com/zdemo/Event.java +++ b/src/com/zdemo/Event.java @@ -6,40 +6,18 @@ package com.zdemo; * @param */ public class Event { - private String topic; - private String key; - private V value; + public final String topic; + //public final String key; + public final V value; - public Event() { - } - - public Event(String topic, String key, V value) { + private Event(String topic, V value) { this.topic = topic; - this.key = key; this.value = value; } - public String getTopic() { - return topic; + public static Event of(String topic, V value) { + return new Event(topic, value); } - public void setTopic(String topic) { - this.topic = topic; - } - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - public V getValue() { - return value; - } - - public void setValue(V value) { - this.value = value; - } } diff --git a/src/com/zdemo/EventType.java b/src/com/zdemo/EventType.java new file mode 100644 index 0000000..6d62772 --- /dev/null +++ b/src/com/zdemo/EventType.java @@ -0,0 +1,25 @@ +package com.zdemo; + +import org.redkale.util.TypeToken; + +import java.util.function.Consumer; + +public class EventType { + public final String topic; + public final TypeToken typeToken; + private final Consumer consumer; + + private EventType(String topic, TypeToken typeToken, Consumer consumer) { + this.topic = topic; + this.typeToken = typeToken; + this.consumer = consumer; + } + + public static EventType of(String topic, TypeToken typeToken, Consumer consumer) { + return new EventType<>(topic, typeToken, consumer); + } + + public void accept(T t) { + consumer.accept(t); + } +} diff --git a/src/com/zdemo/IConsumer.java b/src/com/zdemo/IConsumer.java index 33c05f5..c34aaa1 100644 --- a/src/com/zdemo/IConsumer.java +++ b/src/com/zdemo/IConsumer.java @@ -1,8 +1,5 @@ package com.zdemo; -import org.redkale.convert.json.JsonConvert; -import org.redkale.util.TypeToken; - import java.util.Collection; import java.util.logging.Logger; @@ -11,24 +8,5 @@ public interface IConsumer { Collection getSubscribes(); - TypeToken getTypeToken(); - - void accept(T t); - - default void accept(String value) { - System.out.println(value); - if ("com.zdemo.Event".equals(getTypeToken().getType().toString())) { - String _value = value.split("\"value\":")[1]; - _value = _value.substring(0, _value.length() - 1); - Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value.replace(_value, "’‘")); - if (_value.startsWith("\"") && _value.endsWith("\"")) { - _value = _value.substring(1, _value.length() -1); - } - t.setValue(_value); - accept((T) t); - } else { - Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value); - accept((T) t); - } - } + void accept(String topic, String record); } diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index e35a33b..8e197c7 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -1,10 +1,11 @@ package com.zdemo.kafak; -import com.zdemo.Event; +import com.zdemo.AbstractConsumer; +import com.zdemo.EventType; import com.zdemo.IConsumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; import org.redkale.net.http.RestService; import org.redkale.service.Service; import org.redkale.util.AnyValue; @@ -16,48 +17,86 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.time.Duration; import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; + +import static java.util.Arrays.asList; /** * 消费 - * - * @param */ @RestService -public abstract class KafakConsumer implements IConsumer, Service { +public abstract class KafakConsumer extends AbstractConsumer implements IConsumer, Service { @Resource(name = "APP_HOME") protected File APP_HOME; + protected Properties props; + + // 0:none 1:restart -1:stop + //private int cmd = -1; + public abstract String getGroupid(); + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + + @Override + public void addEventType(EventType... eventTypes) { + super.addEventType(eventTypes); + try { + for (EventType eventType : eventTypes) { + queue.put(eventType); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + @Override public void init(AnyValue config) { - new Thread(() -> { - try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { - Properties props = new Properties(); - props.load(fis); - props.put("group.id", getGroupid()); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(getSubscribes()); + try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { + props = new Properties(); + props.load(fis); - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - String value = record.value(); - try { - accept(value); - } catch (Exception e) { - logger.warning("event accept error :" + value); - e.printStackTrace(); + if (logger.isLoggable(Level.INFO)) logger.info(getGroupid() + " consumer started!"); + + new Thread(() -> { + try { + props.put("group.id", getGroupid()); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(asList("_")); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(10_000)); + records.forEach(record -> { + String topic = record.topic(); + long offset = record.offset(); + String value = record.value(); + try { + accept(topic, value); + } catch (Exception e) { + logger.warning(String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value)); + e.printStackTrace(); + } + }); + + // 动态新增订阅 + while (!queue.isEmpty()) { + queue.clear(); + consumer.unsubscribe(); + consumer.subscribe(getSubscribes()); } } + } catch (WakeupException ex) { + System.out.println("WakeupException !!!!"); } - } catch (FileNotFoundException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - }).start(); + }, "thread-consumer-[" + getGroupid() + "]").start(); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } } + + } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index 580ca5c..420a668 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -50,7 +50,7 @@ public class KafakProducer implements IProducer, Service { @Override public void send(T... t) { for (T x : t) { - producer.send(new ProducerRecord(x.getTopic(), JsonConvert.root().convertTo(x))); + producer.send(new ProducerRecord(x.topic, JsonConvert.root().convertTo(x.value))); } } diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 6379e31..08e52a6 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -1,6 +1,6 @@ package com.zdemo.redis; -import com.zdemo.Event; +import com.zdemo.AbstractConsumer; import com.zdemo.IConsumer; import org.redkale.service.Service; import org.redkale.util.AnyValue; @@ -12,7 +12,7 @@ import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.net.Socket; -public abstract class RedisConsumer implements IConsumer, Service { +public abstract class RedisConsumer extends AbstractConsumer implements IConsumer, Service { @Resource(name = "property.redis.host") private String host = "127.0.0.1"; @@ -21,10 +21,6 @@ public abstract class RedisConsumer implements IConsumer, Se @Resource(name = "property.redis.port") private int port = 6379; - public String getGroupid() { - return ""; - } - @Override public void init(AnyValue config) { new Thread(() -> { @@ -61,9 +57,9 @@ public abstract class RedisConsumer implements IConsumer, Se br.readLine(); //$n len(value) String value = br.readLine(); // value try { - accept(value); + accept(topic, value); } catch (Exception e) { - logger.warning("event accept error :" + value); + logger.warning("topic[" + topic + "] event accept error :" + value); e.printStackTrace(); } } diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java index 805cc05..a565df1 100644 --- a/src/com/zdemo/redis/RedisProducer.java +++ b/src/com/zdemo/redis/RedisProducer.java @@ -42,7 +42,7 @@ public class RedisProducer implements IProducer, Service { public void send(T... t) { for (T x : t) { try { - oswPub.write("PUBLISH " + x.getTopic() + " '" + JsonConvert.root().convertTo(x) + "' \r\n"); + oswPub.write("PUBLISH " + x.topic + " '" + JsonConvert.root().convertTo(x.value) + "' \r\n"); oswPub.flush(); } catch (IOException e) { e.printStackTrace(); diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 641795c..3513d52 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -1,9 +1,17 @@ package com.zdemo.test; import com.zdemo.Event; -import com.zdemo.redis.RedisProducer; +import com.zdemo.EventType; +import com.zdemo.kafak.KafakProducer; import org.junit.Test; import org.redkale.boot.Application; +import org.redkale.convert.json.JsonConvert; +import org.redkale.util.TypeToken; + +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; /** * 消息发布订阅测试 @@ -13,14 +21,33 @@ public class AppTest { @Test public void runConsumer() { try { - // 启动并开启消费监听 - Application.singleton(MyConsumer.class); + //启动并开启消费监听 + MyConsumer consumer = Application.singleton(MyConsumer.class); - try { - Thread.sleep(15_000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + //新增订阅主题 a1 + consumer.addEventType(EventType.of("a1", new TypeToken() { + }, r -> { + System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(r)); + })); + + Thread.sleep(5_000); + + //新增订阅主题 b1、c1 + consumer.addEventType( + // 订阅主题 b1 + EventType.of("b1", new TypeToken>() { + }, r -> { + System.out.println("我收到了消息 主题B 事件:" + JsonConvert.root().convertTo(r)); + }), + + // 订阅主题 c1 + EventType.of("c1", new TypeToken>() { + }, r -> { + System.out.println("我收到了消息 主题C 事件:" + JsonConvert.root().convertTo(r)); + }) + ); + + Thread.sleep(60_000); } catch (Exception e) { e.printStackTrace(); } @@ -29,14 +56,17 @@ public class AppTest { @Test public void runProducer() { try { - RedisProducer producer = Application.singleton(RedisProducer.class); + KafakProducer producer = Application.singleton(KafakProducer.class); - Event event = new Event<>(); - event.setTopic("c"); - event.setKey("abx"); - event.setValue(1f); + // 发送不同的 事件 + float v0 = 1f; + Map v1 = Map.of("k", "v"); + List v2 = asList(1, 2, 3); + + producer.send(Event.of("a1", v0)); + producer.send(Event.of("b1", v1)); + producer.send(Event.of("c1", v2)); - producer.send(event); try { Thread.sleep(1_000); diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 6e4727a..baeaf96 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,37 +1,22 @@ package com.zdemo.test; -import com.zdemo.Event; +import com.zdemo.EventType; import com.zdemo.kafak.KafakConsumer; import org.redkale.convert.json.JsonConvert; import org.redkale.util.TypeToken; -import java.util.Collection; -import java.util.List; - -public class MyConsumer extends KafakConsumer> { +public class MyConsumer extends KafakConsumer { public String getGroupid() { - return "group-test"; //quest、user、im、live + return "group-test"; //消费组名称 } - @Override - public Collection getSubscribes() { - return List.of("a", "b", "c", "vis-log"); - } - - @Override - public TypeToken> getTypeToken() { - return new TypeToken>() { - }; - } - - @Override - public void accept(Event event) { - switch (event.getTopic()) { - case "a" -> System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(event)); - case "b" -> System.out.println("我收到了消息 主题B 事件:" + JsonConvert.root().convertTo(event)); - case "c" -> System.out.println("我收到了消息 主题C 事件:" + JsonConvert.root().convertTo(event)); - } - + { + addEventType( + EventType.of("a1", new TypeToken() { + }, r -> { + System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(r)); + }) + ); } }