From 0bedcf366ef41bdfb21b01875e3b51341963311e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=9D=E5=B0=98?= <237809796@qq.com> Date: Tue, 9 Apr 2024 02:06:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9A1=E3=80=81rpc?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E4=BC=98=E5=85=88=E8=B0=83=E7=94=A8=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0=E8=AE=A2=E9=98=85=20=20=20=20=20=202=E3=80=81?= =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E6=A8=A1=E5=BC=8F=E8=BF=94=E5=9B=9E=E7=BB=93?= =?UTF-8?q?=E6=9E=9C=E4=B8=8E=E7=9B=AE=E6=A0=87=E7=B1=BB=E5=9E=8B=E4=B8=8D?= =?UTF-8?q?=E4=B8=80=E8=87=B4=E7=9A=84=E8=BD=AC=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- src/main/java/tccn/AbstractConsumer.java | 26 +- src/main/java/tccn/Event.java | 5 +- src/main/java/tccn/timer/TimerExecutor.java | 2 +- src/main/java/tccn/timer/TimerTask.java | 6 +- src/main/java/tccn/zhub/Rpc.java | 51 +- src/main/java/tccn/zhub/RpcResult.java | 4 +- src/main/java/tccn/zhub/ZHubClient.java | 533 ++++++++++---------- 8 files changed, 327 insertions(+), 302 deletions(-) diff --git a/pom.xml b/pom.xml index 1f43145..da2f4b5 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.tccn zhub-client-spring - 17.0.0408.dev + 17.0.0409.dev org.springframework.boot diff --git a/src/main/java/tccn/AbstractConsumer.java b/src/main/java/tccn/AbstractConsumer.java index 784b8f7..7da80d5 100644 --- a/src/main/java/tccn/AbstractConsumer.java +++ b/src/main/java/tccn/AbstractConsumer.java @@ -8,6 +8,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; /** @@ -18,7 +19,7 @@ public abstract class AbstractConsumer implements IConsumer { public Gson gson = Rpc.gson; - private Map eventMap = new HashMap<>(); + protected Map> eventMap = new ConcurrentHashMap<>(); protected abstract String getGroupid(); @@ -36,6 +37,7 @@ public abstract class AbstractConsumer implements IConsumer { return set; } + // topic 消息消费前处理 protected void accept(String topic, String value) { EventType eventType = eventMap.get(topic); @@ -49,6 +51,19 @@ public abstract class AbstractConsumer implements IConsumer { eventType.accept(data); } + // rpc 被调用端 + protected void rpcAccept(String topic, T value) { + EventType eventType = eventMap.get(topic); + + /*// eventType 与 T 的类型比较、 + if (!eventType.typeToken.getType().getTypeName().equals(value.getClass().getTypeName())) { + eventType.accept(toStr(value)); + } else { + eventType.accept(value); + }*/ + eventType.accept(value); + } + protected final void removeEventType(String topic) { eventMap.remove(topic); } @@ -76,4 +91,13 @@ public abstract class AbstractConsumer implements IConsumer { } } + protected String toStr(Object v) { + if (v instanceof String) { + return (String) v; + } else if (v == null) { + return null; + } + return gson.toJson(v); + } + } diff --git a/src/main/java/tccn/Event.java b/src/main/java/tccn/Event.java index 4ab7a72..5ac02f7 100644 --- a/src/main/java/tccn/Event.java +++ b/src/main/java/tccn/Event.java @@ -15,9 +15,8 @@ public class Event { this.value = value; } - public static Event of(String topic, V value) { - return new Event(topic, value); + public static Event of(String topic, V value) { + return new Event<>(topic, value); } - } diff --git a/src/main/java/tccn/timer/TimerExecutor.java b/src/main/java/tccn/timer/TimerExecutor.java index 59986fd..2004df0 100644 --- a/src/main/java/tccn/timer/TimerExecutor.java +++ b/src/main/java/tccn/timer/TimerExecutor.java @@ -24,7 +24,7 @@ public class TimerExecutor { for (Task t : task) { t.setTimerExecutor(this); queue.push(t); - logger.finest("add new task : " + t.getName()); + // logger.finest("add new task : " + t.getName()); } } diff --git a/src/main/java/tccn/timer/TimerTask.java b/src/main/java/tccn/timer/TimerTask.java index 6b2c956..4072487 100644 --- a/src/main/java/tccn/timer/TimerTask.java +++ b/src/main/java/tccn/timer/TimerTask.java @@ -93,10 +93,10 @@ public class TimerTask implements Task { if (!isComplete) { int count = execCount.incrementAndGet(); // 执行次数+1 - long start = System.currentTimeMillis(); + // long start = System.currentTimeMillis(); job.execute(this); - long end = System.currentTimeMillis(); - logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count)); + // long end = System.currentTimeMillis(); + // logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count)); if (!isComplete) { timerExecutor.add(this, true); diff --git a/src/main/java/tccn/zhub/Rpc.java b/src/main/java/tccn/zhub/Rpc.java index 6ebba4e..6bc63f3 100644 --- a/src/main/java/tccn/zhub/Rpc.java +++ b/src/main/java/tccn/zhub/Rpc.java @@ -2,7 +2,14 @@ package tccn.zhub; import com.google.gson.Gson; import com.google.gson.annotations.Expose; +import com.google.gson.reflect.TypeToken; +import lombok.Getter; +import lombok.Setter; +import java.util.UUID; + +@Getter +@Setter public class Rpc { /*public final static Gson gson = new GsonBuilder() .excludeFieldsWithoutExposeAnnotation() @@ -16,48 +23,18 @@ public class Rpc { @Expose(deserialize = false, serialize = false) private RpcResult rpcResult; - public Rpc() { - } + @Expose(deserialize = false, serialize = false) + private TypeToken typeToken; - public Rpc(String appname, String ruk, String topic, Object value) { - this.ruk = appname + "::" + ruk; + /*public Rpc() { + }*/ + + protected Rpc(String appname, String topic, T value) { + this.ruk = appname + "::" + UUID.randomUUID().toString().replaceAll("-", ""); this.topic = topic; - this.value = (T) gson.toJson(value); - } - - public String getRuk() { - return ruk; - } - - public void setRuk(String ruk) { - this.ruk = ruk; - } - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public T getValue() { - return value; - } - - public void setValue(T value) { this.value = value; } - - public RpcResult getRpcResult() { - return rpcResult; - } - - public void setRpcResult(RpcResult rpcResult) { - this.rpcResult = rpcResult; - } - public String getBackTopic() { return ruk.split("::")[0]; } diff --git a/src/main/java/tccn/zhub/RpcResult.java b/src/main/java/tccn/zhub/RpcResult.java index 8d3858b..767e2c0 100644 --- a/src/main/java/tccn/zhub/RpcResult.java +++ b/src/main/java/tccn/zhub/RpcResult.java @@ -34,7 +34,7 @@ public class RpcResult { return result; } - public void setResult(R result) { - this.result = result; + public void setResult(Object result) { + this.result = (R) result; } } diff --git a/src/main/java/tccn/zhub/ZHubClient.java b/src/main/java/tccn/zhub/ZHubClient.java index eb630f1..a67c943 100644 --- a/src/main/java/tccn/zhub/ZHubClient.java +++ b/src/main/java/tccn/zhub/ZHubClient.java @@ -5,10 +5,7 @@ import jakarta.annotation.PostConstruct; import lombok.Setter; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import tccn.AbstractConsumer; -import tccn.Event; -import tccn.IConsumer; -import tccn.IProducer; +import tccn.*; import tccn.timer.Timers; import java.io.BufferedReader; @@ -18,14 +15,8 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.BiConsumer; +import java.util.*; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Level; @@ -53,26 +44,20 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer init(null); } + private Socket client; private OutputStream writer; private BufferedReader reader; private final LinkedBlockingQueue timerQueue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue> topicQueue = new LinkedBlockingQueue<>(); - private final LinkedBlockingQueue> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG - private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG + private final LinkedBlockingQueue> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG [=> Object] + private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG [=> Object] private final LinkedBlockingQueue sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG - private final BiConsumer threadBuilder = (r, n) -> { - for (int i = 0; i < n; i++) { - new Thread(() -> r.run()).start(); - } - }; - - /*private static boolean isFirst = true; - private boolean isMain = false;*/ - private static final Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient + private static Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient public ZHubClient() { + } public ZHubClient(String addr, String groupid, String appid, String auth) { @@ -96,220 +81,226 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } // 设置第一个启动的 实例为主实例 - /*if (isFirst) { - isMain = true; - isFirst = false; - }*/ if (!mainHub.containsKey(addr)) { // 确保同步执行此 init 逻辑 mainHub.put(addr, this); } - if (!initSocket(0)) { - return; - } - // 消息 事件接收 - new Thread(() -> { - while (true) { - try { - String readLine = reader.readLine(); - if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理 - continue; - } - - String type; - - // +ping - if ("+ping".equals(readLine)) { - send("+pong"); - continue; - } - - // 主题订阅消息 - if ("*3".equals(readLine)) { - reader.readLine(); // $7 len() - - type = reader.readLine(); // message - if (!"message".equals(type)) { + CompletableFuture.runAsync(() -> { + if (!initSocket(0)) { + return; + } + // 消息 事件接收 + new Thread(() -> { + while (true) { + try { + String readLine = reader.readLine(); + if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理 continue; } - reader.readLine(); //$n len(key) - String topic = reader.readLine(); // topic - String lenStr = reader.readLine();//$n len(value) - int clen = 0; - if (lenStr.startsWith("$")) { - clen = Integer.parseInt(lenStr.replace("$", "")); + String type = ""; + + // +ping + if ("+ping".equals(readLine)) { + send("+pong"); + continue; } - String value = ""; - do { - if (value.length() > 0) { - value += "\r\n"; + // 主题订阅消息 + if ("*3".equals(readLine)) { + readLine = reader.readLine(); // $7 len() + type = reader.readLine(); // message + if (!"message".equals(type)) { + continue; } - String s = reader.readLine(); - value += s; // value - } while (clen > 0 && clen > strLength(value)); + reader.readLine(); //$n len(key) + String topic = reader.readLine(); // topic + String lenStr = reader.readLine();//$n len(value) + int clen = 0; + if (lenStr.startsWith("$")) { + clen = Integer.parseInt(lenStr.replace("$", "")); + } - // lock msg - if ("lock".equals(topic)) { - Lock lock = lockTag.get(value); - if (lock != null) { - synchronized (lock) { - lock.success = true; - lock.notifyAll(); + String value = ""; + do { + if (!value.isEmpty()) { + value += "\r\n"; } - } - continue; - } - // trylock msg - if ("trylock".equals(topic)) { - Lock lock = lockTag.get(value); - if (lock != null) { - synchronized (lock) { - lock.success = false; - lock.notifyAll(); + String s = reader.readLine(); + value += s; // value + } while (clen > 0 && clen > strLength(value)); + + logger.finest("topic[" + topic + "]: " + value); + + // lock msg + if ("lock".equals(topic)) { + Lock lock = lockTag.get(value); + if (lock != null) { + synchronized (lock) { + lock.success = true; + lock.notifyAll(); + } } + continue; } + // trylock msg + if ("trylock".equals(topic)) { + Lock lock = lockTag.get(value); + if (lock != null) { + synchronized (lock) { + lock.success = false; + lock.notifyAll(); + } + } + continue; + } + + // rpc back msg + if (appid.equals(topic)) { + rpcBackQueue.add(Event.of(topic, value)); + continue; + } + + // rpc call msg + if (rpcTopics.contains(topic)) { + rpcCallQueue.add(Event.of(topic, value)); + continue; + } + + // oth msg + topicQueue.add(Event.of(topic, value)); continue; } - // rpc back msg - if (appid.equals(topic)) { - rpcBackQueue.add(Event.of(topic, value)); + // timer 消息 + if ("*2".equals(readLine)) { + readLine = reader.readLine(); // $7 len() + type = reader.readLine(); // message + if (!"timer".equals(type)) { + continue; + } + reader.readLine(); //$n len(key) + String topic = reader.readLine(); // name + + logger.finest("timer[" + topic + "]: "); + timerQueue.add(timerMap.get(topic)); continue; } - // rpc call msg - if (rpcTopics.contains(topic)) { - rpcCallQueue.add(Event.of(topic, value)); - continue; + logger.finest(readLine); + } catch (IOException e) { + if (e instanceof SocketException) { + initSocket(Integer.MAX_VALUE); } - - // oth msg - topicQueue.add(Event.of(topic, value)); - continue; + e.printStackTrace(); } - - // timer 消息 - if ("*2".equals(readLine)) { - reader.readLine(); // $7 len() - - type = reader.readLine(); // message - if (!"timer".equals(type)) { - continue; - } - reader.readLine(); //$n len(key) - String topic = reader.readLine(); // name - - timerQueue.add(timerMap.get(topic)); - continue; - } - - logger.finest(readLine); - } catch (IOException e) { - if (e instanceof SocketException) { - initSocket(Integer.MAX_VALUE); - } - e.printStackTrace(); } - } - }).start(); + }).start(); + }).thenAcceptAsync(x -> { + // 定时调度事件,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); + while (true) { + Timer timer = null; + try { + timer = timerQueue.take(); - // 定时调度事件 - threadBuilder.accept(() -> { - while (true) { - Timer timer = null; - try { - if ((timer = timerQueue.take()) == null) { - return; + long start = System.currentTimeMillis(); + pool.submit(timer.runnable).get(5, TimeUnit.SECONDS); + long end = System.currentTimeMillis(); + logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start)); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e); + pool = Executors.newFixedThreadPool(1); + } catch (Exception e) { + logger.log(Level.WARNING, "timer [" + timer.name + "]", e); } - long start = System.currentTimeMillis(); - timer.runnable.run(); - long end = System.currentTimeMillis(); - logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start)); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e) { - logger.log(Level.WARNING, "timer [" + timer.name + "]", e); } - } - }, 1); + }).start(); - // topic msg - threadBuilder.accept(() -> { - while (true) { - Event event = null; - try { - if ((event = topicQueue.take()) == null) { - continue; + // topic msg,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); + while (true) { + Event event = null; + try { + event = topicQueue.take(); + + String topic = event.topic; + String value = event.value; + pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e); + pool = Executors.newFixedThreadPool(1); + } catch (Exception e) { + logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + toStr(event.value), e); } - logger.log(Level.FINE, "topic[" + event.topic + "] :" + event.value); - accept(event.topic, event.value); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e) { - logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); } - } - }, 1); + }).start(); - // rpc back - threadBuilder.accept(() -> { - while (true) { - Event event = null; - try { - if ((event = rpcBackQueue.take()) == null) { - continue; + // rpc back ,仅做数据解析,暂无耗时监控 + new Thread(() -> { + while (true) { + Event event = null; + try { + event = rpcBackQueue.take(); + //if (event) + logger.finest(String.format("rpc-back:[%s]: %s", event.topic, toStr(event.value))); + rpcAccept(event.value); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + toStr(event.value), e); } - //if (event) - logger.info(String.format("rpc-back:[%s]: %s", event.topic, event.value)); - rpcAccept(event.value); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e) { - logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e); } - } - }, 1); + }).start(); - // rpc call - threadBuilder.accept(() -> { - while (true) { - Event event = null; - try { - if ((event = rpcCallQueue.take()) == null) { - continue; + // rpc call,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); + while (true) { + Event event = null; + try { + event = rpcCallQueue.take(); + + logger.finest(String.format("rpc-call:[%s] %s", event.topic, toStr(event.value))); + String topic = event.topic; + Object value = event.value; + pool.submit(() -> rpcAccept(topic, value)).get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e); + pool = Executors.newFixedThreadPool(1); + } catch (Exception e) { + logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + toStr(event.value), e); } - logger.info(String.format("rpc-call:[%s] %s", event.topic, event.value)); - accept(event.topic, event.value); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e) { - logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e); } - } - }, 1); + }).start(); - // send msg - threadBuilder.accept(() -> { - while (true) { - String msg = null; - try { - if ((msg = sendMsgQueue.take()) == null) { - continue; + // send msg + new Thread(() -> { + while (true) { + String msg = null; + try { + msg = sendMsgQueue.take(); + + // logger.log(Level.FINEST, "send-msg: [" + msg + "]"); + writer.write(msg.getBytes()); + writer.flush(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e); } - // logger.log(Level.FINEST, "send-msg: [" + msg + "]"); - writer.write(msg.getBytes()); - writer.flush(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e) { - logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e); } - } - }, 1); - + }).start(); + }); } // --------------------- @@ -356,16 +347,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer return str.length(); } - private String toStr(Object v) { - if (v instanceof String) { - return (String) v; - } else if (v == null) { - return null; - } - - return gson.toJson(v); - } - protected boolean initSocket(int retry) { for (int i = 0; i <= retry; i++) { try { @@ -373,8 +354,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer String host = hostPort[0]; int port = Integer.parseInt(hostPort[1]); - //private ReentrantLock lock = new ReentrantLock(); - Socket client = new Socket(); + client = new Socket(); client.connect(new InetSocketAddress(host, port)); client.setKeepAlive(true); @@ -388,9 +368,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer send("auth", auth); send("groupid " + groupid); - StringBuffer buf = new StringBuffer("subscribe lock trylock"); - /*if (isMain) { - }*/ + StringBuilder buf = new StringBuilder("subscribe lock trylock"); if (mainHub.containsValue(this)) { buf.append(" ").append(appid); } @@ -402,13 +380,13 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // 重连 timer 订阅 timerMap.forEach((name, timer) -> send("timer", name)); if (retry > 0) { - logger.warning(String.format("ZHubClient[%s][%s] %s Succeed!", getGroupid(), i + 1, retry > 0 ? "reconnection" : "init")); + logger.warning(String.format("ZHubClient[%s][%s] %s Succeed!", getGroupid(), i + 1, "reconnection")); } else { - logger.fine(String.format("ZHubClient[%s] %s Succeed!", getGroupid(), retry > 0 ? "reconnection" : "init")); + logger.fine(String.format("ZHubClient[%s] %s Succeed!", getGroupid(), "init")); } return true; } catch (Exception e) { - if (retry == 0 || i == 0) { + if (i == 0) { logger.log(Level.WARNING, String.format("ZHubClient[%s] %s Failed 初始化失败!", getGroupid(), retry == 0 ? "init" : "reconnection"), e); try { Thread.sleep(1000); @@ -435,11 +413,15 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } public boolean publish(String topic, Object v) { + /*if (eventMap.containsKey(topic)) { // 本地调用 + topicQueue.add(Event.of(topic, v)); + return true; + }*/ return send("publish", topic, toStr(v)); } public void broadcast(String topic, Object v) { - send("broadcast", topic, toStr(v)); + send("broadcast", topic, toStr(v)); // 广播必须走远端模式 } // 发送 publish 主题消息,若多次发送的 topic + "-" + value 相同,将会做延时重置 @@ -465,22 +447,16 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } } - switch (endchar) { - case "M": - delay *= (1000 * 60 * 60 * 24 * 30); - break; - case "d": - delay *= (1000 * 60 * 60 * 24); - break; - case "H": - delay *= (1000 * 60 * 60); - break; - case "m": - delay *= (1000 * 60); - break; - case "s": - delay *= 1000; - break; + if ("M".equals(endchar)) { + delay *= (1000 * 60 * 60 * 24 * 30); + } else if ("d".equals(endchar)) { + delay *= (1000 * 60 * 60 * 24); + } else if ("H".equals(endchar)) { + delay *= (1000 * 60 * 60); + } else if ("m".equals(endchar)) { + delay *= (1000 * 60); + } else if ("s".equals(endchar)) { + delay *= 1000; } delay(topic, v, delay); @@ -497,7 +473,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } // ================================================== lock ================================================== - private final Map lockTag = new ConcurrentHashMap<>(); + private Map lockTag = new ConcurrentHashMap<>(); /** * 尝试加锁,立即返回, @@ -538,7 +514,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } // ================================================== timer ================================================== - private final ConcurrentHashMap timerMap = new ConcurrentHashMap(); + private ConcurrentHashMap timerMap = new ConcurrentHashMap(); class Timer { String name; @@ -566,6 +542,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer send("timer", name); } + @Deprecated public void reloadTimer() { send("cmd", "reload-timer"); } @@ -573,7 +550,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // ================================================== rpc ================================================== // -- 调用端 -- private static final Map rpcMap = new ConcurrentHashMap<>(); - private static final Map rpcRetType = new ConcurrentHashMap<>(); + // private static final Map rpcRetType = new ConcurrentHashMap<>(); // rpc call public RpcResult rpc(String topic, Object v) { @@ -587,14 +564,18 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // rpc call public RpcResult rpc(String topic, T v, TypeToken typeToken, long timeout) { - Rpc rpc = new Rpc<>(appid, UUID.randomUUID().toString().replaceAll("-", ""), topic, v); + Rpc rpc = new Rpc<>(appid, topic, v); + rpc.setTypeToken(typeToken); + String ruk = rpc.getRuk(); rpcMap.put(ruk, rpc); - if (typeToken != null) { - rpcRetType.put(ruk, typeToken); - } try { - publish(topic, rpc); // send("rpc", topic, toStr(rpc)); + if (eventMap.containsKey(topic)) { // 本地调用 + rpcCallQueue.add(Event.of(topic, rpc)); + } else { + rpc.setValue(toStr(rpc.getValue())); + publish(topic, rpc); // send("rpc", topic, toStr(rpc)); + } synchronized (rpc) { if (timeout <= 0) { timeout = 1000 * 15; @@ -644,8 +625,30 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // RpcResult: {ruk:xxx-xxxx, retcode:0} // rpc call back consumer - private void rpcAccept(String value) { - RpcResult resp = gson.fromJson(value, new TypeToken>() { + private void rpcAccept(T value) { + // 接收到 本地调用返回的 RpcResult + if (value instanceof RpcResult) { + String ruk = ((RpcResult) value).getRuk(); + Rpc rpc = rpcMap.remove(ruk); + if (rpc == null) { + return; + } + + // TODO, 本地模式下返回的数据对象类型需要和处理端一致,不然会出现类型转换异常 - 解决办法,当出现不一致的情况取数据做转换 + TypeToken typeToken = rpc.getTypeToken(); + if (typeToken.getType() != ((RpcResult) value).getResult().getClass()) { + Object result = gson.fromJson(toStr(((RpcResult) value).getResult()), typeToken.getType()); + ((RpcResult) value).setResult(result); + } + + rpc.setRpcResult((RpcResult) value); + synchronized (rpc) { + rpc.notify(); + } + return; + } + + RpcResult resp = gson.fromJson((String) value, new TypeToken>() { }.getType()); String ruk = resp.getRuk(); @@ -653,10 +656,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer if (rpc == null) { return; } - TypeToken typeToken = rpcRetType.get(ruk); + TypeToken typeToken = rpc.getTypeToken(); Object result = resp.getResult(); - if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName())) { + if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName()) && !"java.lang.Void".equals(typeToken.getType().getTypeName())) { result = gson.fromJson((String) resp.getResult(), typeToken.getType()); } @@ -668,22 +671,38 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } // -- 订阅端 -- - private final HashSet rpcTopics = new HashSet(); + private Set rpcTopics = new HashSet(); + + // rpc call consumer + public void rpcSubscribe(String topic, Function, RpcResult> fun) { + rpcSubscribe(topic, IType.STRING, fun); + } // rpc call consumer public void rpcSubscribe(String topic, TypeToken typeToken, Function, RpcResult> fun) { - Consumer consumer = v -> { + Consumer consumer = v -> { Rpc rpc = null; try { - rpc = gson.fromJson(v, new TypeToken>() { - }.getType()); + if (v instanceof String) { + rpc = gson.fromJson((String) v, new TypeToken>() { + }.getType()); + } else { + rpc = (Rpc) v; + } // 参数转换 - T paras = gson.fromJson((String) rpc.getValue(), typeToken.getType()); - rpc.setValue(paras); + if (rpc.getValue() instanceof String && !"java.lang.String".equals(typeToken.getType().getTypeName())) { + T paras = gson.fromJson((String) rpc.getValue(), typeToken.getType()); + rpc.setValue(paras); + } + RpcResult result = fun.apply(rpc); - result.setResult(toStr(result.getResult())); - publish(rpc.getBackTopic(), result); + if (appid.equals(rpc.getBackTopic())) { + rpcBackQueue.add(Event.of(topic, result)); + } else { + result.setResult(toStr(result.getResult())); // 远程模式 结果转换 + publish(rpc.getBackTopic(), result); + } } catch (Exception e) { logger.log(Level.WARNING, "rpc call consumer error: " + v, e); publish(rpc.getBackTopic(), rpc.retError("服务调用失败!")); @@ -692,6 +711,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer }; rpcTopics.add(topic); - subscribe(topic, consumer); + subscribe(topic, typeToken, consumer); + } + + public static void main(String[] args) { + System.out.println(IType.INT.getType()); + Integer a = 1; + System.out.println(a.getClass()); } }