diff --git a/src/com/zdemo/cachex/RedisCacheSource.java b/src/com/zdemo/cachex/RedisCacheSource.java index a1583e2..fcbe6a4 100644 --- a/src/com/zdemo/cachex/RedisCacheSource.java +++ b/src/com/zdemo/cachex/RedisCacheSource.java @@ -2218,7 +2218,7 @@ abstract class ReplyCompletionHandler implements CompletionHandler " + buffer.get()); - logger.info("--- 打印buffer start --- "); + //logger.info("--- 打印buffer start --- "); return;//传null则表示使用StandardCharsets.UTF_8 } if (has) out.write(lasted); diff --git a/src/com/zdemo/zhub/Delays.java b/src/com/zdemo/zhub/Delays.java new file mode 100644 index 0000000..8466b97 --- /dev/null +++ b/src/com/zdemo/zhub/Delays.java @@ -0,0 +1,72 @@ +package com.zdemo.zhub; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.logging.Logger; + +public class Delays implements Delayed, Runnable { + public Logger logger = Logger.getLogger(Delays.class.getSimpleName()); + + private long time; // 执行时间 + private Runnable runnable; // 任务到时间执行 runnable + + public Delays(long timeout, Runnable runnable) { + this.time = System.currentTimeMillis() + timeout; + this.runnable = runnable; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed other) { + if (other == this) { // compare zero ONLY if same object + return 0; + } + if (other instanceof Delays) { + Delays x = (Delays) other; + long diff = time - x.time; + if (diff < 0) { + return -1; + } else if (diff > 0) { + return 1; + } + } + long d = (getDelay(TimeUnit.NANOSECONDS) - + other.getDelay(TimeUnit.NANOSECONDS)); + return (d == 0) ? 0 : ((d < 0) ? -1 : 1); + } + + @Override + public void run() { + runnable.run(); + } + + // =========== + public static DelayQueue delayQueue = new DelayQueue<>(); + + public static void addDelay(long timeout, Runnable runnable) { + delayQueue.add(new Delays(timeout, runnable)); + } + + public static void tryDelay(Supplier supplier, long delayMillis, int maxCount) { + + } + + static { + new Thread(() -> { + try { + while (true) { + Delays delay = delayQueue.take(); + delay.run(); //异常会导致延时队列失败 + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + } +} diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index a7fd685..e7cb105 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -506,6 +506,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer @Comment("rpc call") public RpcResult rpc(String topic, T v, TypeToken typeToken) { + return rpc(topic, v, typeToken, 0); + } + + @Comment("rpc call") + public RpcResult rpc(String topic, T v, TypeToken typeToken, long timeout) { Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v); String ruk = rpc.getRuk(); rpcMap.put(ruk, rpc); @@ -513,14 +518,29 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer rpcRetType.put(ruk, typeToken); } try { - publish(topic, rpc); + publish(topic, rpc); // send("rpc", topic, toStr(rpc)); synchronized (rpc) { - rpc.wait(); //todo: 调用超时处理 + if (timeout <= 0) { + timeout = 1000 * 15; + } + // call timeout default: 15s + Delays.addDelay(timeout, () -> { + RpcResult rpcResult = rpc.buildResp(505, "请求超时"); + rpc.setRpcResult(rpcResult); + synchronized (rpc) { + logger.warning("rpc timeout: " + convert.convertTo(rpc)); + rpc.notify(); + } + }); + + rpc.wait(); rpcMap.remove(ruk); } } catch (InterruptedException e) { e.printStackTrace(); - // todo: 设置请求失败 + // call error + RpcResult rpcResult = rpc.buildResp(501, "请求失败"); + rpc.setRpcResult(rpcResult); } return rpc.getRpcResult(); } @@ -533,6 +553,14 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken)); } + public CompletableFuture> rpcAsync(String topic, T v, long timeout) { + return CompletableFuture.supplyAsync(() -> rpc(topic, v, null, timeout)); + } + + public CompletableFuture> rpcAsync(String topic, T v, TypeToken typeToken, long timeout) { + return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken, timeout)); + } + // RpcResult: {ruk:xxx-xxxx, retcode:0} @Comment("rpc call back consumer") private void rpcAccept(String value) {