diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index d427a4a..b18ec90 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -20,6 +20,7 @@ public abstract class AbstractConsumer implements IConsumer { return true; } + @Deprecated public void addEventType(EventType... eventType) { for (EventType type : eventType) { String[] topics = type.topic.split(","); diff --git a/src/com/zdemo/IProducer.java b/src/com/zdemo/IProducer.java index 017b70b..da4b5d9 100644 --- a/src/com/zdemo/IProducer.java +++ b/src/com/zdemo/IProducer.java @@ -2,9 +2,11 @@ package com.zdemo; import java.util.logging.Logger; -public interface IProducer { +public interface IProducer { Logger logger = Logger.getLogger(IProducer.class.getSimpleName()); - void send(T t); + @Deprecated + void send(T t); + void publish(String topic, V v); } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index a8f6fc0..97c6f67 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -19,10 +19,10 @@ import java.util.logging.Level; /** * 生产 * - * @param + * @param */ @RestService -public class KafakProducer implements IProducer, Service { +public class KafakProducer implements IProducer, Service { private KafkaProducer producer; @Resource(name = "APP_HOME") @@ -40,8 +40,9 @@ public class KafakProducer implements IProducer, Service { } } + @Deprecated @Override - public void send(T t) { + public void send(T t) { String v = JsonConvert.root().convertTo(t.value); if (v.startsWith("\"") && v.endsWith("\"")) { v = v.substring(1, v.length() - 1); @@ -49,8 +50,20 @@ public class KafakProducer implements IProducer, Service { producer.send(new ProducerRecord(t.topic, v)); } + @Override + public void publish(String topic, V v) { + producer.send(new ProducerRecord(topic, toStr(v))); + } + @Override public void destroy(AnyValue config) { producer.close(); } + + private String toStr(V v) { + if (v instanceof String) { + return (String) v; + } + return JsonConvert.root().convertTo(v); + } } diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java index df2d004..bb8b8d9 100644 --- a/src/com/zdemo/redis/RedisProducer.java +++ b/src/com/zdemo/redis/RedisProducer.java @@ -13,7 +13,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.logging.Level; -public class RedisProducer implements IProducer, Service { +public class RedisProducer implements IProducer, Service { @Resource(name = "property.redis.host") private String host = "127.0.0.1"; @@ -39,8 +39,9 @@ public class RedisProducer implements IProducer, Service { } } + @Deprecated @Override - public void send(T t) { + public void send(T t) { try { String v = JsonConvert.root().convertTo(t.value); if (v.startsWith("\"") && v.endsWith("\"")) { @@ -53,4 +54,22 @@ public class RedisProducer implements IProducer, Service { } } + + @Override + public void publish(String topic, V v) { + try { + osw.write("PUBLISH " + topic + " '" + toStr(v) + "' \r\n"); + osw.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + + } + } + + private String toStr(V v) { + if (v instanceof String) { + return (String) v; + } + return JsonConvert.root().convertTo(v); + } } diff --git a/src/com/zdemo/zdb/ZHubClient.java b/src/com/zdemo/zdb/ZHubClient.java index 53b51dc..cc32609 100644 --- a/src/com/zdemo/zdb/ZHubClient.java +++ b/src/com/zdemo/zdb/ZHubClient.java @@ -231,6 +231,15 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, } } + + public void publish(String topic, V v) { + send("publish", topic, toStr(v)); + } + + public void broadcast(String topic, V v) { + send("broadcast", topic, toStr(v)); + } + @Override public void subscribe(String topic, Consumer consumer) { addEventType(EventType.of(topic, consumer)); diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index b60d621..416ff2f 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -1,7 +1,5 @@ package com.zdemo.test; -import com.zdemo.Event; -import com.zdemo.EventType; import com.zdemo.IProducer; import org.junit.Test; import org.redkale.boot.Application; @@ -24,11 +22,9 @@ public class AppTest { //启动并开启消费监听 MyConsumer consumer = Application.singleton(MyConsumer.class); - consumer.addEventType( - EventType.of("a-1", str -> { - System.out.println("我收到了消息 a 事件:" + str); - }) - ); + consumer.subscribe("a-1", str -> { + System.out.println("我收到了消息 a 事件:" + str); + }); consumer.timer("a", () -> { System.out.println(Utility.now() + " timer a 执行了"); @@ -44,10 +40,9 @@ public class AppTest { @Test public void runProducer() { try { - IProducer producer = Application.singleton(MyConsumer.class); + MyConsumer producer = Application.singleton(MyConsumer.class); for (int i = 0; i < 10_0000; i++) { - producer.send(Event.of("a-1", i + "")); - producer.send(Event.of("a-1", i)); + producer.publish("a-1", i); } try { @@ -117,7 +112,7 @@ public class AppTest { for (int i = 0; i < 100; i++) { - producer.send(Event.of("x", "x")); + producer.publish("x", "x"); Thread.sleep(1000); } } catch (Exception e) {