From d7058907d6683fd56bbb732292a63078b1ec924f Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Wed, 7 Jul 2021 12:56:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Arpc=20=E8=B0=83?= =?UTF-8?q?=E7=94=A8=E7=AB=AF=E8=AE=A2=E9=98=85=20topic=20=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/com/zdemo/zhub/ZHubClient.java | 15 ++++++++--- test/com/zdemo/test/AppTest.java | 4 +-- test/com/zdemo/test/HelloService.java | 37 +++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 6 deletions(-) create mode 100644 test/com/zdemo/test/HelloService.java diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index 4aca092..55448ff 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -15,6 +15,7 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -58,8 +59,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } }; - private static boolean isFirst = true; - private boolean isMain = false; + /*private static boolean isFirst = true; + private boolean isMain = false;*/ + private static Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient @Override public void init(AnyValue config) { @@ -75,9 +77,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } // 设置第一个启动的 实例为主实例 - if (isFirst) { + /*if (isFirst) { isMain = true; isFirst = false; + }*/ + if (!mainHub.containsKey(host + ":" + port)) { // 确保同步执行此 init 逻辑 + mainHub.put(host + ":" + port, this); } if (!initSocket(0)) { @@ -312,7 +317,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer send("groupid " + groupid); StringBuffer buf = new StringBuffer("subscribe lock"); - if (isMain) { // TODO: + /*if (isMain) { + }*/ + if (mainHub.containsValue(this)) { buf.append(" " + APP_NAME); } for (String topic : getTopics()) { diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 2b80817..f8667a7 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -26,11 +26,11 @@ public class AppTest { //启动并开启消费监听 MyConsumer consumer = Application.singleton(MyConsumer.class); - /*consumer.subscribe("a", str -> { + consumer.subscribe("a", str -> { logger.info("我收到了消息 a 事件:" + str); }); - consumer.timer("a", () -> { + /*consumer.timer("a", () -> { System.out.println(Utility.now() + " timer a 执行了"); try { Thread.sleep(3000); diff --git a/test/com/zdemo/test/HelloService.java b/test/com/zdemo/test/HelloService.java new file mode 100644 index 0000000..072f305 --- /dev/null +++ b/test/com/zdemo/test/HelloService.java @@ -0,0 +1,37 @@ +package com.zdemo.test; + +import com.zdemo.IConsumer; +import com.zdemo.zhub.RpcResult; +import com.zdemo.zhub.ZHubClient; +import org.redkale.net.http.RestService; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; +import org.redkale.util.TypeToken; + +import javax.annotation.Resource; + +@RestService(automapping = true) +public class HelloService implements Service { + + @Resource(name = "zhub") + private ZHubClient zhub; + + @Override + public void init(AnyValue config) { + // Function, RpcResult> fun + zhub.rpcSubscribe("x", new TypeToken() { + }, r -> { + + return r.buildResp(r.getValue().toUpperCase() + ": Ok"); + }); + } + + public RpcResult x(String v) { + if (v == null) { + v = ""; + } + RpcResult x = zhub.rpc("x", v, IConsumer.TYPE_TOKEN_STRING).join(); + + return x; + } +}