新增:rpc 超时处理
This commit is contained in:
parent
0bb543d1a9
commit
257e840e7e
@ -2218,7 +2218,7 @@ abstract class ReplyCompletionHandler implements CompletionHandler<Integer, Byte
|
|||||||
out.removeLastByte();//读掉 \r
|
out.removeLastByte();//读掉 \r
|
||||||
//buffer.get();//读掉 \n
|
//buffer.get();//读掉 \n
|
||||||
//logger.info("打印buffer.get()---> " + buffer.get());
|
//logger.info("打印buffer.get()---> " + buffer.get());
|
||||||
logger.info("--- 打印buffer start --- ");
|
//logger.info("--- 打印buffer start --- ");
|
||||||
return;//传null则表示使用StandardCharsets.UTF_8
|
return;//传null则表示使用StandardCharsets.UTF_8
|
||||||
}
|
}
|
||||||
if (has) out.write(lasted);
|
if (has) out.write(lasted);
|
||||||
|
72
src/com/zdemo/zhub/Delays.java
Normal file
72
src/com/zdemo/zhub/Delays.java
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
package com.zdemo.zhub;
|
||||||
|
|
||||||
|
import java.util.concurrent.DelayQueue;
|
||||||
|
import java.util.concurrent.Delayed;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
public class Delays implements Delayed, Runnable {
|
||||||
|
public Logger logger = Logger.getLogger(Delays.class.getSimpleName());
|
||||||
|
|
||||||
|
private long time; // 执行时间
|
||||||
|
private Runnable runnable; // 任务到时间执行 runnable
|
||||||
|
|
||||||
|
public Delays(long timeout, Runnable runnable) {
|
||||||
|
this.time = System.currentTimeMillis() + timeout;
|
||||||
|
this.runnable = runnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(TimeUnit unit) {
|
||||||
|
return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(Delayed other) {
|
||||||
|
if (other == this) { // compare zero ONLY if same object
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (other instanceof Delays) {
|
||||||
|
Delays x = (Delays) other;
|
||||||
|
long diff = time - x.time;
|
||||||
|
if (diff < 0) {
|
||||||
|
return -1;
|
||||||
|
} else if (diff > 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
long d = (getDelay(TimeUnit.NANOSECONDS) -
|
||||||
|
other.getDelay(TimeUnit.NANOSECONDS));
|
||||||
|
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
runnable.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===========
|
||||||
|
public static DelayQueue<Delays> delayQueue = new DelayQueue<>();
|
||||||
|
|
||||||
|
public static void addDelay(long timeout, Runnable runnable) {
|
||||||
|
delayQueue.add(new Delays(timeout, runnable));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void tryDelay(Supplier<Boolean> supplier, long delayMillis, int maxCount) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
new Thread(() -> {
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
Delays delay = delayQueue.take();
|
||||||
|
delay.run(); //异常会导致延时队列失败
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
}
|
@ -506,6 +506,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
|
|
||||||
@Comment("rpc call")
|
@Comment("rpc call")
|
||||||
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken) {
|
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken) {
|
||||||
|
return rpc(topic, v, typeToken, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Comment("rpc call")
|
||||||
|
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) {
|
||||||
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
|
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
|
||||||
String ruk = rpc.getRuk();
|
String ruk = rpc.getRuk();
|
||||||
rpcMap.put(ruk, rpc);
|
rpcMap.put(ruk, rpc);
|
||||||
@ -513,14 +518,29 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
rpcRetType.put(ruk, typeToken);
|
rpcRetType.put(ruk, typeToken);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
publish(topic, rpc);
|
publish(topic, rpc); // send("rpc", topic, toStr(rpc));
|
||||||
synchronized (rpc) {
|
synchronized (rpc) {
|
||||||
rpc.wait(); //todo: 调用超时处理
|
if (timeout <= 0) {
|
||||||
|
timeout = 1000 * 15;
|
||||||
|
}
|
||||||
|
// call timeout default: 15s
|
||||||
|
Delays.addDelay(timeout, () -> {
|
||||||
|
RpcResult rpcResult = rpc.buildResp(505, "请求超时");
|
||||||
|
rpc.setRpcResult(rpcResult);
|
||||||
|
synchronized (rpc) {
|
||||||
|
logger.warning("rpc timeout: " + convert.convertTo(rpc));
|
||||||
|
rpc.notify();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
rpc.wait();
|
||||||
rpcMap.remove(ruk);
|
rpcMap.remove(ruk);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
// todo: 设置请求失败
|
// call error
|
||||||
|
RpcResult rpcResult = rpc.buildResp(501, "请求失败");
|
||||||
|
rpc.setRpcResult(rpcResult);
|
||||||
}
|
}
|
||||||
return rpc.getRpcResult();
|
return rpc.getRpcResult();
|
||||||
}
|
}
|
||||||
@ -533,6 +553,14 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken));
|
return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, long timeout) {
|
||||||
|
return CompletableFuture.supplyAsync(() -> rpc(topic, v, null, timeout));
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, TypeToken<R> typeToken, long timeout) {
|
||||||
|
return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken, timeout));
|
||||||
|
}
|
||||||
|
|
||||||
// RpcResult: {ruk:xxx-xxxx, retcode:0}
|
// RpcResult: {ruk:xxx-xxxx, retcode:0}
|
||||||
@Comment("rpc call back consumer")
|
@Comment("rpc call back consumer")
|
||||||
private void rpcAccept(String value) {
|
private void rpcAccept(String value) {
|
||||||
|
Loading…
Reference in New Issue
Block a user