From ff3d877a3726fde430a9999f2d85d9ebbb00b889 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=BB=9D=E5=B0=98?= <237809796@qq.com>
Date: Thu, 28 Mar 2024 00:22:43 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9Arpc=E8=B0=83?=
=?UTF-8?q?=E7=94=A8=E4=BC=98=E5=85=88=E8=B0=83=E7=94=A8=E6=9C=AC=E5=9C=B0?=
=?UTF-8?q?=E8=AE=A2=E9=98=85?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 35 +++--
src/main/java/net/tccn/AbstractConsumer.java | 18 ++-
src/main/java/net/tccn/Event.java | 4 +-
.../java/net/tccn/timer/TimerExecutor.java | 2 +-
src/main/java/net/tccn/timer/TimerTask.java | 6 +-
src/main/java/net/tccn/zhub/Rpc.java | 5 +-
src/main/java/net/tccn/zhub/ZHubClient.java | 122 ++++++++++--------
7 files changed, 106 insertions(+), 86 deletions(-)
diff --git a/pom.xml b/pom.xml
index 9c15025..bf36328 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,11 +14,26 @@
UTF-8
+
+
+ maven-release
+ maven-nexus
+ https://nexus.1216.top/repository/maven-public/
+
+
+
+
+ mvn-release
+ mvn-release
+ https://nexus.1216.top/repository/maven-releases/
+
+
+
org.redkale
redkale
- 2.8.0-dev
+ 2.8.0.dev
compile
@@ -28,22 +43,4 @@
compile
-
-
-
-
- maven-nexus
- maven-nexus
- http://47.106.237.198:8081/repository/maven-public/
-
-
-
-
-
-
- mvn-release
- mvn-release
- http://47.106.237.198:8081/repository/maven-releases/
-
-
\ No newline at end of file
diff --git a/src/main/java/net/tccn/AbstractConsumer.java b/src/main/java/net/tccn/AbstractConsumer.java
index f126411..c2dbb9c 100644
--- a/src/main/java/net/tccn/AbstractConsumer.java
+++ b/src/main/java/net/tccn/AbstractConsumer.java
@@ -19,7 +19,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
protected static String APP_NAME = "";
- private Map eventMap = new ConcurrentHashMap<>();
+ protected Map> eventMap = new ConcurrentHashMap<>();
protected abstract String getGroupid();
@@ -31,6 +31,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
return Set.of("-");
}
+ // topic 消息消费前处理
protected void accept(String topic, String value) {
EventType eventType = eventMap.get(topic);
@@ -44,6 +45,12 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
eventType.accept(data);
}
+ // rpc 被调用端
+ protected void rpcAccept(String topic, T value) {
+ EventType eventType = eventMap.get(topic);
+ eventType.accept(value);
+ }
+
protected final void removeEventType(String topic) {
eventMap.remove(topic);
}
@@ -77,4 +84,13 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
public String resourceName() {
return super.getName();
}
+
+ protected String toStr(Object v) {
+ if (v instanceof String) {
+ return (String) v;
+ } else if (v == null) {
+ return null;
+ }
+ return convert.convertTo(v);
+ }
}
diff --git a/src/main/java/net/tccn/Event.java b/src/main/java/net/tccn/Event.java
index cc957c6..fcc8b65 100644
--- a/src/main/java/net/tccn/Event.java
+++ b/src/main/java/net/tccn/Event.java
@@ -15,8 +15,8 @@ public class Event {
this.value = value;
}
- public static Event of(String topic, V value) {
- return new Event(topic, value);
+ public static Event of(String topic, V value) {
+ return new Event<>(topic, value);
}
diff --git a/src/main/java/net/tccn/timer/TimerExecutor.java b/src/main/java/net/tccn/timer/TimerExecutor.java
index 720ddf5..355788e 100644
--- a/src/main/java/net/tccn/timer/TimerExecutor.java
+++ b/src/main/java/net/tccn/timer/TimerExecutor.java
@@ -24,7 +24,7 @@ public class TimerExecutor {
for (Task t : task) {
t.setTimerExecutor(this);
queue.push(t);
- logger.finest("add new task : " + t.getName());
+ // logger.finest("add new task : " + t.getName());
}
}
diff --git a/src/main/java/net/tccn/timer/TimerTask.java b/src/main/java/net/tccn/timer/TimerTask.java
index e1993b4..95865aa 100644
--- a/src/main/java/net/tccn/timer/TimerTask.java
+++ b/src/main/java/net/tccn/timer/TimerTask.java
@@ -93,10 +93,10 @@ public class TimerTask implements Task {
if (!isComplete) {
int count = execCount.incrementAndGet(); // 执行次数+1
- long start = System.currentTimeMillis();
+ // long start = System.currentTimeMillis();
job.execute(this);
- long end = System.currentTimeMillis();
- logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count));
+ // long end = System.currentTimeMillis();
+ // logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count));
if (!isComplete) {
timerExecutor.add(this, true);
diff --git a/src/main/java/net/tccn/zhub/Rpc.java b/src/main/java/net/tccn/zhub/Rpc.java
index 5d1aaeb..b2a47a4 100644
--- a/src/main/java/net/tccn/zhub/Rpc.java
+++ b/src/main/java/net/tccn/zhub/Rpc.java
@@ -1,7 +1,6 @@
package net.tccn.zhub;
import org.redkale.convert.ConvertColumn;
-import org.redkale.convert.json.JsonConvert;
import org.redkale.service.RetResult;
public class Rpc {
@@ -14,10 +13,10 @@ public class Rpc {
public Rpc() {
}
- protected Rpc(String appname, String ruk, String topic, Object value) {
+ protected Rpc(String appname, String ruk, String topic, T value) {
this.ruk = appname + "::" + ruk;
this.topic = topic;
- this.value = (T) JsonConvert.root().convertTo(value);
+ this.value = value;
}
public String getRuk() {
diff --git a/src/main/java/net/tccn/zhub/ZHubClient.java b/src/main/java/net/tccn/zhub/ZHubClient.java
index 3ce64a2..539e7a9 100644
--- a/src/main/java/net/tccn/zhub/ZHubClient.java
+++ b/src/main/java/net/tccn/zhub/ZHubClient.java
@@ -41,17 +41,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
private BufferedReader reader;
private final LinkedBlockingQueue timerQueue = new LinkedBlockingQueue<>();
- private final LinkedBlockingQueue> topicQueue = new LinkedBlockingQueue<>();
- private final LinkedBlockingQueue> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG
- private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG
+ private final LinkedBlockingQueue> topicQueue = new LinkedBlockingQueue<>(); // [=> Object]
+ private final LinkedBlockingQueue> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG [=> Object]
+ private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG [=> Object]
private final LinkedBlockingQueue sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG
- /*private BiConsumer threadBuilder = (r, n) -> {
- for (int i = 0; i < n; i++) {
- new Thread(() -> r.run()).start();
- }
- };*/
-
private static Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient
public ZHubClient() {
@@ -145,7 +139,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
String value = "";
do {
- if (value.length() > 0) {
+ if (!value.isEmpty()) {
value += "\r\n";
}
String s = reader.readLine();
@@ -225,9 +219,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
while (true) {
Timer timer = null;
try {
- if ((timer = timerQueue.take()) == null) {
- return;
- }
+ timer = timerQueue.take();
+
long start = System.currentTimeMillis();
pool.submit(timer.runnable).get(5, TimeUnit.SECONDS);
long end = System.currentTimeMillis();
@@ -249,9 +242,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
while (true) {
Event event = null;
try {
- if ((event = topicQueue.take()) == null) {
- continue;
- }
+ event = topicQueue.take();
String topic = event.topic;
String value = event.value;
@@ -259,10 +250,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
- logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
+ logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
- logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
+ logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + toStr(event.value), e);
}
}
}).start();
@@ -270,18 +261,16 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// rpc back ,仅做数据解析,暂无耗时监控
new Thread(() -> {
while (true) {
- Event event = null;
+ Event