From 3e91d785e3f7a3400419fefd7bf1832d694f004e Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Sat, 23 Jan 2021 11:03:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=E5=8F=91=E5=B8=83=E6=A0=87=E5=87=86=E6=96=B9=E6=B3=95=20publis?= =?UTF-8?q?h=20=E6=9B=BF=E6=8D=A2=E5=8E=9F=20send=20=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/com/zdemo/AbstractConsumer.java | 1 + src/com/zdemo/IProducer.java | 6 ++++-- src/com/zdemo/kafak/KafakProducer.java | 19 ++++++++++++++++--- src/com/zdemo/redis/RedisProducer.java | 23 +++++++++++++++++++++-- src/com/zdemo/zdb/ZHubClient.java | 9 +++++++++ test/com/zdemo/test/AppTest.java | 17 ++++++----------- 6 files changed, 57 insertions(+), 18 deletions(-) diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index d427a4a..b18ec90 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -20,6 +20,7 @@ public abstract class AbstractConsumer implements IConsumer { return true; } + @Deprecated public void addEventType(EventType... eventType) { for (EventType type : eventType) { String[] topics = type.topic.split(","); diff --git a/src/com/zdemo/IProducer.java b/src/com/zdemo/IProducer.java index 017b70b..da4b5d9 100644 --- a/src/com/zdemo/IProducer.java +++ b/src/com/zdemo/IProducer.java @@ -2,9 +2,11 @@ package com.zdemo; import java.util.logging.Logger; -public interface IProducer { +public interface IProducer { Logger logger = Logger.getLogger(IProducer.class.getSimpleName()); - void send(T t); + @Deprecated + void send(T t); + void publish(String topic, V v); } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index a8f6fc0..97c6f67 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -19,10 +19,10 @@ import java.util.logging.Level; /** * 生产 * - * @param + * @param */ @RestService -public class KafakProducer implements IProducer, Service { +public class KafakProducer implements IProducer, Service { private KafkaProducer producer; @Resource(name = "APP_HOME") @@ -40,8 +40,9 @@ public class KafakProducer implements IProducer, Service { } } + @Deprecated @Override - public void send(T t) { + public void send(T t) { String v = JsonConvert.root().convertTo(t.value); if (v.startsWith("\"") && v.endsWith("\"")) { v = v.substring(1, v.length() - 1); @@ -49,8 +50,20 @@ public class KafakProducer implements IProducer, Service { producer.send(new ProducerRecord(t.topic, v)); } + @Override + public void publish(String topic, V v) { + producer.send(new ProducerRecord(topic, toStr(v))); + } + @Override public void destroy(AnyValue config) { producer.close(); } + + private String toStr(V v) { + if (v instanceof String) { + return (String) v; + } + return JsonConvert.root().convertTo(v); + } } diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java index df2d004..bb8b8d9 100644 --- a/src/com/zdemo/redis/RedisProducer.java +++ b/src/com/zdemo/redis/RedisProducer.java @@ -13,7 +13,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.logging.Level; -public class RedisProducer implements IProducer, Service { +public class RedisProducer implements IProducer, Service { @Resource(name = "property.redis.host") private String host = "127.0.0.1"; @@ -39,8 +39,9 @@ public class RedisProducer implements IProducer, Service { } } + @Deprecated @Override - public void send(T t) { + public void send(T t) { try { String v = JsonConvert.root().convertTo(t.value); if (v.startsWith("\"") && v.endsWith("\"")) { @@ -53,4 +54,22 @@ public class RedisProducer implements IProducer, Service { } } + + @Override + public void publish(String topic, V v) { + try { + osw.write("PUBLISH " + topic + " '" + toStr(v) + "' \r\n"); + osw.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + + } + } + + private String toStr(V v) { + if (v instanceof String) { + return (String) v; + } + return JsonConvert.root().convertTo(v); + } } diff --git a/src/com/zdemo/zdb/ZHubClient.java b/src/com/zdemo/zdb/ZHubClient.java index 53b51dc..cc32609 100644 --- a/src/com/zdemo/zdb/ZHubClient.java +++ b/src/com/zdemo/zdb/ZHubClient.java @@ -231,6 +231,15 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, } } + + public void publish(String topic, V v) { + send("publish", topic, toStr(v)); + } + + public void broadcast(String topic, V v) { + send("broadcast", topic, toStr(v)); + } + @Override public void subscribe(String topic, Consumer consumer) { addEventType(EventType.of(topic, consumer)); diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index b60d621..416ff2f 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -1,7 +1,5 @@ package com.zdemo.test; -import com.zdemo.Event; -import com.zdemo.EventType; import com.zdemo.IProducer; import org.junit.Test; import org.redkale.boot.Application; @@ -24,11 +22,9 @@ public class AppTest { //启动并开启消费监听 MyConsumer consumer = Application.singleton(MyConsumer.class); - consumer.addEventType( - EventType.of("a-1", str -> { - System.out.println("我收到了消息 a 事件:" + str); - }) - ); + consumer.subscribe("a-1", str -> { + System.out.println("我收到了消息 a 事件:" + str); + }); consumer.timer("a", () -> { System.out.println(Utility.now() + " timer a 执行了"); @@ -44,10 +40,9 @@ public class AppTest { @Test public void runProducer() { try { - IProducer producer = Application.singleton(MyConsumer.class); + MyConsumer producer = Application.singleton(MyConsumer.class); for (int i = 0; i < 10_0000; i++) { - producer.send(Event.of("a-1", i + "")); - producer.send(Event.of("a-1", i)); + producer.publish("a-1", i); } try { @@ -117,7 +112,7 @@ public class AppTest { for (int i = 0; i < 100; i++) { - producer.send(Event.of("x", "x")); + producer.publish("x", "x"); Thread.sleep(1000); } } catch (Exception e) {