diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index d35b224..d427a4a 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -5,7 +5,6 @@ import org.redkale.convert.json.JsonConvert; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.logging.Logger; /** * @author Liang @@ -13,8 +12,6 @@ import java.util.logging.Logger; */ public abstract class AbstractConsumer implements IConsumer { - public Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - public final Map eventMap = new HashMap<>(); public abstract String getGroupid(); diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index e734a0f..c87ff9a 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; +import java.util.logging.Logger; /** * 消费 @@ -25,6 +26,8 @@ import java.util.logging.Level; @RestService public abstract class KafakConsumer extends AbstractConsumer implements IConsumer, Service { + public Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + @Resource(name = "APP_HOME") protected File APP_HOME; diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 7bf770e..6831133 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -14,9 +14,12 @@ import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.util.logging.Level; +import java.util.logging.Logger; public class RedisConsumer extends AbstractConsumer implements IConsumer, Service { + public Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + @Resource(name = "property.redis.host") private String host = "127.0.0.1"; @Resource(name = "property.redis.password") diff --git a/src/com/zdemo/zdb/ZHubConsumer.java b/src/com/zdemo/zdb/ZHubClient.java similarity index 92% rename from src/com/zdemo/zdb/ZHubConsumer.java rename to src/com/zdemo/zdb/ZHubClient.java index 4b9aa68..84bed93 100644 --- a/src/com/zdemo/zdb/ZHubConsumer.java +++ b/src/com/zdemo/zdb/ZHubClient.java @@ -1,8 +1,7 @@ package com.zdemo.zdb; -import com.zdemo.AbstractConsumer; -import com.zdemo.EventType; -import com.zdemo.IConsumer; +import com.zdemo.*; +import org.redkale.convert.json.JsonConvert; import org.redkale.service.Service; import org.redkale.util.AnyValue; @@ -17,8 +16,11 @@ import java.net.SocketException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; +import java.util.logging.Logger; -public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service { +public abstract class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service { + + Logger logger = Logger.getLogger(IProducer.class.getSimpleName()); @Resource(name = "property.zhub.host") private String host = "127.0.0.1"; @@ -106,7 +108,7 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer } // --------------------- - // 消息发送类 + // 消息发送 private void send(String... data) { try { lock.lock(); @@ -127,6 +129,14 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer } } + public void send(Event t) { + String v = JsonConvert.root().convertTo(t.value); + if (v.startsWith("\"") && v.endsWith("\"")) { + v = v.substring(1, v.length() - 1); + } + send("publish", t.topic, v); + } + public boolean initSocket() { try { client = new Socket(); diff --git a/src/com/zdemo/zdb/ZHubProducer.java b/src/com/zdemo/zdb/ZHubProducer.java deleted file mode 100644 index 5b4c691..0000000 --- a/src/com/zdemo/zdb/ZHubProducer.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.zdemo.zdb; - -import com.zdemo.Event; -import com.zdemo.IProducer; -import org.redkale.convert.json.JsonConvert; -import org.redkale.service.Service; -import org.redkale.util.AnyValue; - -import javax.annotation.Resource; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; - -public class ZHubProducer implements IProducer, Service { - - @Resource(name = "property.zdb.host") - private String host = "39.108.56.246"; - @Resource(name = "property.zdb.password") - private String password = ""; - @Resource(name = "property.zdb.port") - private int port = 1216; - - private ReentrantLock lock = new ReentrantLock(); - - private OutputStream os; - - @Override - public void init(AnyValue config) { - Socket client = new Socket(); - try { - client.connect(new InetSocketAddress(host, port)); - client.setKeepAlive(true); - os = client.getOutputStream(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void send(T t) { - String v = JsonConvert.root().convertTo(t.value); - if (v.startsWith("\"") && v.endsWith("\"")) { - v = v.substring(1, v.length() - 1); - } - send("publish", t.topic, v); - } - - private void send(String... data) { - try { - lock.lock(); - if (data.length == 1) { - os.write((data[0] + "\r\n").getBytes()); - } else if (data.length > 1) { - os.write(("*" + data.length + "\r\n").getBytes()); - for (String d : data) { - os.write(("$" + d.length() + "\r\n").getBytes()); - os.write((d + "\r\n").getBytes()); - } - } - } catch (IOException e) { - logger.log(Level.WARNING, "", e); - } finally { - lock.unlock(); - } - } - - private byte[] toBytes(int v) { - byte[] result = new byte[4]; - result[0] = (byte) ((v >> 24) & 0xFF); - result[1] = (byte) ((v >> 16) & 0xFF); - result[2] = (byte) ((v >> 8) & 0xFF); - result[3] = (byte) (v & 0xFF); - return result; - } - -} diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 18d49eb..5affae8 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -1,8 +1,8 @@ package com.zdemo.test; import com.zdemo.Event; +import com.zdemo.EventType; import com.zdemo.IProducer; -import com.zdemo.zdb.ZHubProducer; import org.junit.Test; import org.redkale.boot.Application; import org.redkale.util.Utility; @@ -27,15 +27,11 @@ public class AppTest { //启动并开启消费监听 MyConsumer consumer = Application.singleton(MyConsumer.class); - /*consumer.addEventType( - EventType.of("ax", str -> { + consumer.addEventType( + EventType.of("a-1", str -> { System.out.println("我收到了消息 a 事件:" + str); }) - - , EventType.of("bx", str -> { - System.out.println("我收到了消息 主题bx 事件:" + str); - }) - );*/ + ); consumer.timer("a", () -> { System.out.println(Utility.now() + " timer a 执行了"); @@ -51,21 +47,10 @@ public class AppTest { @Test public void runProducer() { try { - IProducer producer = Application.singleton(ZHubProducer.class); - - // 发送不同的 事件 - float v0 = 1f; - Map v1 = Map.of("k", "v"); - List v2 = asList(1, 2, 3); - - //producer.send(Event.of("a1", v0)); - /*producer.send(Event.of("b1", v1)); - producer.send(Event.of("c1", v2));*/ - - /*producer.send(Event.of("game-update", 23256)); - producer.send(Event.of("bx", 23256));*/ + IProducer producer = Application.singleton(MyConsumer.class); for (int i = 0; i < 10_0000; i++) { producer.send(Event.of("a-1", i + "")); + producer.send(Event.of("a-1", i)); } try { @@ -131,7 +116,7 @@ public class AppTest { public void yy() { IProducer producer = null; try { - producer = Application.singleton(ZHubProducer.class); + producer = Application.singleton(MyConsumer.class); for (int i = 0; i < 100; i++) { diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index e57013e..a6110b0 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,8 +1,8 @@ package com.zdemo.test; -import com.zdemo.zdb.ZHubConsumer; +import com.zdemo.zdb.ZHubClient; -public class MyConsumer extends ZHubConsumer { +public class MyConsumer extends ZHubClient { public String getGroupid() { return "group-test"; //消费组名称