修改:rpc 方法

This commit is contained in:
lxy 2021-08-29 19:44:04 +08:00
parent 074e5a0685
commit 5201cb6088

View File

@ -8,7 +8,10 @@ import org.redkale.service.Service;
import org.redkale.util.*;
import javax.annotation.Resource;
import java.io.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
@ -128,7 +131,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} while (clen > 0 && clen > strLength(value));
// lock msg
if ("lock".equals(topic)) {
Lock lock = lockTag.get(value);
@ -498,31 +500,37 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
private static Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
@Comment("rpc call")
public CompletableFuture<RpcResult<Void>> rpc(String topic, Object v) {
public RpcResult<Void> rpc(String topic, Object v) {
return rpc(topic, v, null);
}
@Comment("rpc call")
public <T, R> CompletableFuture<RpcResult<R>> rpc(String topic, T v, TypeToken<R> typeToken) {
return CompletableFuture.supplyAsync(() -> {
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
String ruk = rpc.getRuk();
rpcMap.put(ruk, rpc);
if (typeToken != null) {
rpcRetType.put(ruk, typeToken);
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken) {
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
String ruk = rpc.getRuk();
rpcMap.put(ruk, rpc);
if (typeToken != null) {
rpcRetType.put(ruk, typeToken);
}
try {
publish(topic, rpc);
synchronized (rpc) {
rpc.wait(); //todo: 调用超时处理
rpcMap.remove(ruk);
}
try {
publish(topic, rpc);
synchronized (rpc) {
rpc.wait(); //todo: 调用超时处理
rpcMap.remove(ruk);
}
} catch (InterruptedException e) {
e.printStackTrace();
// todo: 设置请求失败
}
return rpc.getRpcResult();
});
} catch (InterruptedException e) {
e.printStackTrace();
// todo: 设置请求失败
}
return rpc.getRpcResult();
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, null));
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, TypeToken<R> typeToken) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken));
}
// RpcResult: {ruk:xxx-xxxx, retcode:0}