diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index a2d03c8..a7fd685 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -8,7 +8,10 @@ import org.redkale.service.Service; import org.redkale.util.*; import javax.annotation.Resource; -import java.io.*; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; @@ -128,7 +131,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } while (clen > 0 && clen > strLength(value)); - // lock msg if ("lock".equals(topic)) { Lock lock = lockTag.get(value); @@ -498,31 +500,37 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer private static Map rpcRetType = new ConcurrentHashMap<>(); @Comment("rpc call") - public CompletableFuture> rpc(String topic, Object v) { + public RpcResult rpc(String topic, Object v) { return rpc(topic, v, null); } @Comment("rpc call") - public CompletableFuture> rpc(String topic, T v, TypeToken typeToken) { - return CompletableFuture.supplyAsync(() -> { - Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v); - String ruk = rpc.getRuk(); - rpcMap.put(ruk, rpc); - if (typeToken != null) { - rpcRetType.put(ruk, typeToken); + public RpcResult rpc(String topic, T v, TypeToken typeToken) { + Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v); + String ruk = rpc.getRuk(); + rpcMap.put(ruk, rpc); + if (typeToken != null) { + rpcRetType.put(ruk, typeToken); + } + try { + publish(topic, rpc); + synchronized (rpc) { + rpc.wait(); //todo: 调用超时处理 + rpcMap.remove(ruk); } - try { - publish(topic, rpc); - synchronized (rpc) { - rpc.wait(); //todo: 调用超时处理 - rpcMap.remove(ruk); - } - } catch (InterruptedException e) { - e.printStackTrace(); - // todo: 设置请求失败 - } - return rpc.getRpcResult(); - }); + } catch (InterruptedException e) { + e.printStackTrace(); + // todo: 设置请求失败 + } + return rpc.getRpcResult(); + } + + public CompletableFuture> rpcAsync(String topic, T v) { + return CompletableFuture.supplyAsync(() -> rpc(topic, v, null)); + } + + public CompletableFuture> rpcAsync(String topic, T v, TypeToken typeToken) { + return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken)); } // RpcResult: {ruk:xxx-xxxx, retcode:0}