diff --git a/src/main/java/net/tccn/zhub/Rpc.java b/src/main/java/net/tccn/zhub/Rpc.java index b2a47a4..e83be3b 100644 --- a/src/main/java/net/tccn/zhub/Rpc.java +++ b/src/main/java/net/tccn/zhub/Rpc.java @@ -2,6 +2,8 @@ package net.tccn.zhub; import org.redkale.convert.ConvertColumn; import org.redkale.service.RetResult; +import org.redkale.util.TypeToken; +import org.redkale.util.Utility; public class Rpc { private String ruk; // request unique key: @@ -10,11 +12,13 @@ public class Rpc { private RpcResult rpcResult; + private TypeToken typeToken; + public Rpc() { } - protected Rpc(String appname, String ruk, String topic, T value) { - this.ruk = appname + "::" + ruk; + protected Rpc(String appname, String topic, T value) { + this.ruk = appname + "::" + Utility.uuid(); this.topic = topic; this.value = value; } @@ -52,6 +56,16 @@ public class Rpc { this.rpcResult = rpcResult; } + @ConvertColumn(ignore = true) + public TypeToken getTypeToken() { + return typeToken; + } + + @ConvertColumn(ignore = true) + public void setTypeToken(TypeToken typeToken) { + this.typeToken = typeToken; + } + @ConvertColumn(ignore = true) public String getBackTopic() { return ruk.split("::")[0]; diff --git a/src/main/java/net/tccn/zhub/RpcResult.java b/src/main/java/net/tccn/zhub/RpcResult.java index 6f5bf87..5802515 100644 --- a/src/main/java/net/tccn/zhub/RpcResult.java +++ b/src/main/java/net/tccn/zhub/RpcResult.java @@ -34,7 +34,7 @@ public class RpcResult { return result; } - public void setResult(R result) { - this.result = result; + public void setResult(Object result) { + this.result = (R) result; } } diff --git a/src/main/java/net/tccn/zhub/ZHubClient.java b/src/main/java/net/tccn/zhub/ZHubClient.java index 2b2bf93..65aeffe 100644 --- a/src/main/java/net/tccn/zhub/ZHubClient.java +++ b/src/main/java/net/tccn/zhub/ZHubClient.java @@ -9,7 +9,6 @@ import org.redkale.service.Service; import org.redkale.util.AnyValue; import org.redkale.util.Comment; import org.redkale.util.TypeToken; -import org.redkale.util.Utility; import java.io.BufferedReader; import java.io.IOException; @@ -100,12 +99,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } CompletableFuture.runAsync(() -> { + if (!initSocket(0)) { + return; + } // 消息 事件接收 new Thread(() -> { - if (!initSocket(0)) { - return; - } - while (true) { try { String readLine = reader.readLine(); @@ -410,7 +408,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer send("auth", auth); send("groupid " + groupid); - StringBuffer buf = new StringBuffer("subscribe lock trylock"); + StringBuilder buf = new StringBuilder("subscribe lock trylock"); if (mainHub.containsValue(this)) { buf.append(" " + APP_NAME); } @@ -423,9 +421,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer timerMap.forEach((name, timer) -> { send("timer", name); }); - logger.log(retry > 0 ? Level.WARNING : Level.FINE, - String.format("ZHubClient[%s]%s [%s] Succeed!", getGroupid(), retry > 0 ? "[" + (i + 1) + "]" : "", retry > 0 ? "reconnection" : "init")); - + if (retry > 0) { + logger.warning(String.format("ZHubClient[%s][%s] %s Succeed!", getGroupid(), i + 1, "reconnection")); + } else { + logger.fine(String.format("ZHubClient[%s] %s Succeed!", getGroupid(), "init")); + } return true; } catch (Exception e) { if (i == 0) { @@ -592,7 +592,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // ================================================== rpc ================================================== // -- 调用端 -- private static Map rpcMap = new ConcurrentHashMap<>(); - private static Map rpcRetType = new ConcurrentHashMap<>(); @Comment("rpc call") public RpcResult rpc(String topic, Object v) { @@ -606,12 +605,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer @Comment("rpc call") public RpcResult rpc(String topic, T v, TypeToken typeToken, long timeout) { - Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v); + Rpc rpc = new Rpc<>(APP_NAME, topic, v); + rpc.setTypeToken(typeToken); + String ruk = rpc.getRuk(); rpcMap.put(ruk, rpc); - if (typeToken != null) { - rpcRetType.put(ruk, typeToken); - } try { if (eventMap.containsKey(topic)) { // 本地调用 rpcCallQueue.add(Event.of(topic, rpc)); @@ -669,10 +667,21 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // RpcResult: {ruk:xxx-xxxx, retcode:0} @Comment("rpc call back consumer") private void rpcAccept(T value) { - // 接收到 本地调用返回的 RpcResult TODO, 本地模式下返回的数据对象类型需要和处理端一致,不然会出现类型转换异常 - 解决办法,当出现不一致的情况取数据做转换 + // 接收到 本地调用返回的 RpcResult if (value instanceof RpcResult) { String ruk = ((RpcResult) value).getRuk(); Rpc rpc = rpcMap.remove(ruk); + if (rpc == null) { + return; + } + + // 本地模式下返回的数据对象类型需要和处理端一致,不然会出现类型转换异常 - 解决办法,当出现不一致的情况取数据做转换 + TypeToken typeToken = rpc.getTypeToken(); + if (typeToken.getType() != ((RpcResult) value).getResult().getClass()) { + Object result = convert.convertFrom(typeToken.getType(), toStr(((RpcResult) value).getResult())); + ((RpcResult) value).setResult(result); + } + rpc.setRpcResult((RpcResult) value); synchronized (rpc) { rpc.notify(); @@ -688,7 +697,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer if (rpc == null) { return; } - TypeToken typeToken = rpcRetType.get(ruk); + TypeToken typeToken = rpc.getTypeToken(); Object result = resp.getResult(); if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName()) && !"java.lang.Void".equals(typeToken.getType().getTypeName())) {