新增:rpc 超时处理

This commit is contained in:
lxy 2021-10-12 19:00:06 +08:00
parent 5201cb6088
commit f1e1974aa2
3 changed files with 104 additions and 4 deletions

View File

@ -2218,7 +2218,7 @@ abstract class ReplyCompletionHandler implements CompletionHandler<Integer, Byte
out.removeLastByte();//读掉 \r
//buffer.get();//读掉 \n
//logger.info("打印buffer.get()---> " + buffer.get());
logger.info("--- 打印buffer start --- ");
//logger.info("--- 打印buffer start --- ");
return;//传null则表示使用StandardCharsets.UTF_8
}
if (has) out.write(lasted);

View File

@ -0,0 +1,72 @@
package com.zdemo.zhub;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.logging.Logger;
public class Delays implements Delayed, Runnable {
public Logger logger = Logger.getLogger(Delays.class.getSimpleName());
private long time; // 执行时间
private Runnable runnable; // 任务到时间执行 runnable
public Delays(long timeout, Runnable runnable) {
this.time = System.currentTimeMillis() + timeout;
this.runnable = runnable;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
if (other == this) { // compare zero ONLY if same object
return 0;
}
if (other instanceof Delays) {
Delays x = (Delays) other;
long diff = time - x.time;
if (diff < 0) {
return -1;
} else if (diff > 0) {
return 1;
}
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
@Override
public void run() {
runnable.run();
}
// ===========
public static DelayQueue<Delays> delayQueue = new DelayQueue<>();
public static void addDelay(long timeout, Runnable runnable) {
delayQueue.add(new Delays(timeout, runnable));
}
public static void tryDelay(Supplier<Boolean> supplier, long delayMillis, int maxCount) {
}
static {
new Thread(() -> {
try {
while (true) {
Delays delay = delayQueue.take();
delay.run(); //异常会导致延时队列失败
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

View File

@ -506,6 +506,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
@Comment("rpc call")
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken) {
return rpc(topic, v, typeToken, 0);
}
@Comment("rpc call")
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) {
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
String ruk = rpc.getRuk();
rpcMap.put(ruk, rpc);
@ -513,14 +518,29 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
rpcRetType.put(ruk, typeToken);
}
try {
publish(topic, rpc);
publish(topic, rpc); // send("rpc", topic, toStr(rpc));
synchronized (rpc) {
rpc.wait(); //todo: 调用超时处理
if (timeout <= 0) {
timeout = 1000 * 15;
}
// call timeout default: 15s
Delays.addDelay(timeout, () -> {
RpcResult rpcResult = rpc.buildResp(505, "请求超时");
rpc.setRpcResult(rpcResult);
synchronized (rpc) {
logger.warning("rpc timeout: " + convert.convertTo(rpc));
rpc.notify();
}
});
rpc.wait();
rpcMap.remove(ruk);
}
} catch (InterruptedException e) {
e.printStackTrace();
// todo: 设置请求失败
// call error
RpcResult rpcResult = rpc.buildResp(501, "请求失败");
rpc.setRpcResult(rpcResult);
}
return rpc.getRpcResult();
}
@ -533,6 +553,14 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken));
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, long timeout) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, null, timeout));
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, TypeToken<R> typeToken, long timeout) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken, timeout));
}
// RpcResult: {ruk:xxx-xxxx, retcode:0}
@Comment("rpc call back consumer")
private void rpcAccept(String value) {