diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index 3414a56..d427a4a 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -2,7 +2,9 @@ package com.zdemo; import org.redkale.convert.json.JsonConvert; -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; /** * @author Liang @@ -32,11 +34,11 @@ public abstract class AbstractConsumer implements IConsumer { @Override public final Set getTopics() { - Set keySet = eventMap.keySet(); - if (keySet.isEmpty()) { - keySet.add("-"); + if (!eventMap.isEmpty()) { + return eventMap.keySet(); } - return keySet; + + return Set.of("-"); } @Override diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java index a8e45bb..df2d004 100644 --- a/src/com/zdemo/redis/RedisProducer.java +++ b/src/com/zdemo/redis/RedisProducer.java @@ -42,7 +42,11 @@ public class RedisProducer implements IProducer, Service { @Override public void send(T t) { try { - osw.write("PUBLISH " + t.topic + " '" + JsonConvert.root().convertTo(t.value) + "' \r\n"); + String v = JsonConvert.root().convertTo(t.value); + if (v.startsWith("\"") && v.endsWith("\"")) { + v = v.substring(1, v.length() - 1); + } + osw.write("PUBLISH " + t.topic + " '" + v + "' \r\n"); osw.flush(); } catch (IOException e) { logger.log(Level.WARNING, "", e); diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 71195fc..15193e2 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -4,11 +4,9 @@ import com.zdemo.Event; import com.zdemo.EventType; import com.zdemo.IConsumer; import com.zdemo.IProducer; -import com.zdemo.pulsar.PulsarProducer; +import com.zdemo.kafak.KafakProducer; import org.junit.Test; import org.redkale.boot.Application; -import org.redkale.convert.json.JsonConvert; -import org.redkale.util.TypeToken; import java.util.List; import java.util.Map; @@ -27,29 +25,11 @@ public class AppTest { IConsumer consumer = Application.singleton(MyConsumer.class); consumer.addEventType( - EventType.of("a1", new TypeToken() { - }, r -> { - System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r)); + EventType.of("a", str -> { + System.out.println("我收到了消息 a 事件:" + str); }) - , EventType.of("game-update", str -> { - System.out.println("我收到了消息 主题game-update 事件:" + str); - }) - - , EventType.of("http.req.hello", str -> { - System.out.println("我收到了消息 主题http.req.hello 事件:" + str); - }) - - , EventType.of("http.resp.node2004", str -> { - System.out.println("我收到了消息 主题http.resp.node2004 事件:" + str); - }) - ); - - // 10s 后加入 bx主题 - Thread.sleep(1_000 * 10); - System.out.println("加入新的主题订阅"); - consumer.addEventType( - EventType.of("bx", str -> { + , EventType.of("bx", str -> { System.out.println("我收到了消息 主题bx 事件:" + str); }) ); @@ -64,7 +44,7 @@ public class AppTest { @Test public void runProducer() { try { - IProducer producer = Application.singleton(PulsarProducer.class); + IProducer producer = Application.singleton(KafakProducer.class); // 发送不同的 事件 float v0 = 1f; @@ -75,14 +55,17 @@ public class AppTest { /*producer.send(Event.of("b1", v1)); producer.send(Event.of("c1", v2));*/ - producer.send(Event.of("game-update", 23256)); - producer.send(Event.of("bx", 23256)); + /*producer.send(Event.of("game-update", 23256)); + producer.send(Event.of("bx", 23256));*/ + for (int i = 0; i < 5; i++) { + producer.send(Event.of("a", i + "")); + } - /*try { + try { Thread.sleep(1_000); } catch (InterruptedException e) { e.printStackTrace(); - }*/ + } } catch (Exception e) { e.printStackTrace(); } diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index d9ddd84..8a308e8 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,8 +1,8 @@ package com.zdemo.test; -import com.zdemo.pulsar.PulsarConsumer; +import com.zdemo.kafak.KafakConsumer; -public class MyConsumer extends PulsarConsumer { +public class MyConsumer extends KafakConsumer { public String getGroupid() { return "group-test"; //消费组名称