From 57c216c79198e080306b959f12fca7fe90a8155b Mon Sep 17 00:00:00 2001
From: lxy <237809796@qq.com>
Date: Wed, 3 Feb 2021 19:14:01 +0800
Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E6=80=BB=E7=BA=BF?=
=?UTF-8?q?=E6=B6=88=E6=81=AF=E6=8F=92=E4=BB=B6=E6=A8=A1=E5=BC=8F=E6=B3=A8?=
=?UTF-8?q?=E5=85=A5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
conf/application.xml | 6 ++++
src/com/zdemo/ZhubListener.java | 51 ++++++++++++++++++++++++++++++
src/com/zdemo/zhub/ZHubClient.java | 28 ++++++++++++++--
test/com/zdemo/test/AppTest.java | 2 --
4 files changed, 83 insertions(+), 4 deletions(-)
create mode 100644 src/com/zdemo/ZhubListener.java
diff --git a/conf/application.xml b/conf/application.xml
index 5139982..1501daf 100644
--- a/conf/application.xml
+++ b/conf/application.xml
@@ -4,8 +4,14 @@
+
+
+
+
+
+
diff --git a/src/com/zdemo/ZhubListener.java b/src/com/zdemo/ZhubListener.java
new file mode 100644
index 0000000..f46ae68
--- /dev/null
+++ b/src/com/zdemo/ZhubListener.java
@@ -0,0 +1,51 @@
+package com.zdemo;
+
+import org.redkale.boot.Application;
+import org.redkale.boot.ApplicationListener;
+import org.redkale.service.Service;
+import org.redkale.util.AnyValue;
+import org.redkale.util.RedkaleClassLoader;
+import org.redkale.util.ResourceFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * 服务监听
+ *
+ * @author: liangxy.
+ */
+public class ZhubListener implements ApplicationListener {
+
+ @Override
+ public void preStart(Application application) {
+
+ CompletableFuture.runAsync(() -> {
+ ResourceFactory resourceFactory = application.getResourceFactory();
+ RedkaleClassLoader classLoader = application.getClassLoader();
+
+ AnyValue appConfig = application.getAppConfig();
+ AnyValue zhubs = appConfig.getAnyValue("zhubs");
+ AnyValue[] values = zhubs.getAnyValues("zhub");
+ for (AnyValue zhub : values) {
+ String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
+ try {
+ Class> aClass = classLoader.loadClass(clazz);
+ Service obj = (Service) aClass.newInstance();
+ obj.init(zhub);
+ resourceFactory.register(zhub.get("name"), aClass, obj);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void preShutdown(Application application) {
+
+ }
+}
diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java
index 2e7c38b..a354ed2 100644
--- a/src/com/zdemo/zhub/ZHubClient.java
+++ b/src/com/zdemo/zhub/ZHubClient.java
@@ -7,6 +7,7 @@ import com.zdemo.IProducer;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
+import org.redkale.util.AutoLoad;
import javax.annotation.Resource;
import java.io.BufferedReader;
@@ -23,7 +24,8 @@ import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;
-public abstract class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service {
+@AutoLoad(value = false)
+public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service {
public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName());
@@ -33,6 +35,8 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
private String password = "";
@Resource(name = "property.zhub.port")
private int port = 1216;
+ @Resource(name = "property.zhub.groupid")
+ private String groupid = "";
private ReentrantLock lock = new ReentrantLock();
private Socket client;
@@ -53,6 +57,14 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
if (!preInit()) {
return;
}
+
+ // 自动注入
+ if (config != null) {
+ host = config.getValue("addr", host);
+ port = config.getIntValue("port", port);
+ groupid = config.getValue("groupid", groupid);
+ }
+
if (!initSocket()) {
return;
}
@@ -194,7 +206,11 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
writer = client.getOutputStream();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
- send("groupid " + getGroupid());
+ String groupid = getGroupid();
+ if (groupid == null || groupid.isEmpty()) {
+ throw new RuntimeException("ZHubClient groupid can not is empty");
+ }
+ send("groupid " + groupid);
StringBuffer buf = new StringBuffer("subscribe");
for (String topic : getTopics()) {
@@ -209,6 +225,9 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
} catch (IOException e) {
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
return false;
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
+ return false;
}
return true;
@@ -266,6 +285,11 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
delay(topic, v, delay);
}
+ @Override
+ protected String getGroupid() {
+ return groupid;
+ }
+
@Override
protected void subscribe(String topic) {
send("subscribe " + topic); //新增订阅
diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java
index 36cb9c8..ffe5c33 100644
--- a/test/com/zdemo/test/AppTest.java
+++ b/test/com/zdemo/test/AppTest.java
@@ -25,7 +25,6 @@ public class AppTest {
MyConsumer consumer = Application.singleton(MyConsumer.class);
consumer.subscribe("a", str -> {
- System.out.println(System.currentTimeMillis());
logger.info("我收到了消息 a 事件:" + str);
});
@@ -42,7 +41,6 @@ public class AppTest {
System.out.println(Utility.now() + " ----------------- timer b 执行了");
});
//consumer.delay("a", "1", 200);
- System.out.println(System.currentTimeMillis());
consumer.delay("a", "1", "2000");
Thread.sleep(60_000 * 60);