修改:1、rpc 消费端异常捕获 2、代码风格修改

This commit is contained in:
lxy 2021-04-07 16:27:19 +08:00
parent 3760f01b51
commit 241a507ebc
4 changed files with 66 additions and 60 deletions

View File

@ -28,13 +28,13 @@ public class ZhubListener implements ApplicationListener {
AnyValue zhubs = appConfig.getAnyValue("zhubs");
AnyValue[] values = zhubs.getAnyValues("zhub");
for (AnyValue zhub : values) {
String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
String className = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
try {
Class<?> aClass = classLoader.loadClass(clazz);
Service obj = (Service) aClass.getDeclaredConstructor().newInstance();
Class<?> clazz = classLoader.loadClass(className);
Service obj = (Service) clazz.getDeclaredConstructor().newInstance();
application.getResourceFactory().inject(obj);
obj.init(zhub);
resourceFactory.register(zhub.get("name"), aClass, obj);
resourceFactory.register(zhub.get("name"), clazz, obj);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
e.printStackTrace();
}

View File

@ -8,7 +8,7 @@ public class Rpc<T> {
private String topic; // call topic
private T value; // call paras
private RpcResponse response;
private RpcResult rpcResult;
public Rpc() {
}
@ -43,12 +43,13 @@ public class Rpc<T> {
this.value = value;
}
public <R> RpcResponse<R> getResponse() {
return response;
@ConvertColumn(ignore = true)
public RpcResult getRpcResult() {
return rpcResult;
}
public void setResponse(RpcResponse response) {
this.response = response;
public void setRpcResult(RpcResult rpcResult) {
this.rpcResult = rpcResult;
}
@ConvertColumn(ignore = true)
@ -56,50 +57,32 @@ public class Rpc<T> {
return ruk.split("::")[0];
}
public <R> RpcResponse<R> buildResp() {
RpcResponse<R> response = new RpcResponse<>();
public <R> RpcResult<R> buildResp() {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
return response;
}
public <R> RpcResponse<R> buildResp(int retcode, String retinfo) {
RpcResponse<R> response = new RpcResponse<>();
public <R> RpcResult<R> buildResp(int retcode, String retinfo) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setRetcode(retcode);
response.setRetinfo(retinfo);
return response;
}
public <R> RpcResponse<R> buildError(String retinfo) {
RpcResponse<R> response = new RpcResponse<>();
public <R> RpcResult<R> buildError(String retinfo) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setRetcode(100);
response.setRetinfo(retinfo);
return response;
}
public <R> RpcResponse<R> buildResp(R result) {
RpcResponse<R> response = new RpcResponse<>();
public <R> RpcResult<R> buildResp(R result) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setResult(result);
return response;
}
@ConvertColumn(ignore = true)
public int getRetcode() {
if (this.response == null) {
return -1;
}
return response.getRetcode();
}
@ConvertColumn(ignore = true)
public String getRetinfo() {
if (this.response == null) {
return "";
}
return response.getRetinfo();
}
}

View File

@ -1,6 +1,6 @@
package com.zdemo.zhub;
public class RpcResponse<R> {
public class RpcResult<R> {
private String ruk;
private int retcode;
private String retinfo;

View File

@ -58,6 +58,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
};
private static boolean isFirst = true;
private boolean isMain = false;
@Override
public void init(AnyValue config) {
if (!preInit()) {
@ -71,6 +74,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
groupid = config.getValue("groupid", groupid);
}
// 设置第一个启动的 实例为主实例
if (isFirst) {
isMain = true;
isFirst = false;
}
if (!initSocket(0)) {
return;
}
@ -121,6 +130,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// oth msg
topicQueue.add(Event.of(topic, value));
continue;
}
// timer 消息
@ -189,12 +199,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
continue;
}
//if (event)
logger.info(String.format("rpc-back:[%s]: %s", event.topic, event.value));
rpcAccept(event.value);
logger.info(String.format("rpc-back:[%s] => %s", event.topic, event.value));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e);
}
}
}, 1);
@ -207,7 +217,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
if ((event = rpcCallQueue.take()) == null) {
continue;
}
logger.info(String.format("rpc-call:[%s] => %s", event.topic, event.value));
logger.info(String.format("rpc-call:[%s] %s", event.topic, event.value));
accept(event.topic, event.value);
} catch (InterruptedException e) {
e.printStackTrace();
@ -299,7 +309,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
send("groupid " + groupid);
StringBuffer buf = new StringBuffer("subscribe lock " + APP_NAME);
StringBuffer buf = new StringBuffer("subscribe lock");
if (isMain) { // TODO:
buf.append(" " + APP_NAME);
}
for (String topic : getTopics()) {
buf.append(" ").append(topic);
}
@ -448,43 +461,48 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// ================================================== rpc ==================================================
// -- 调用端 --
private Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
private Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
private static Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
private static Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
@Comment("rpc call")
public CompletableFuture<Rpc> rpc(String topic, Object v) {
return (CompletableFuture) rpc(topic, v, null);
public CompletableFuture<RpcResult<Void>> rpc(String topic, Object v) {
return rpc(topic, v, null);
}
@Comment("rpc call")
public <T, R> CompletableFuture<Rpc<T>> rpc(String topic, T v, TypeToken<R> typeToken) {
public <T, R> CompletableFuture<RpcResult<R>> rpc(String topic, T v, TypeToken<R> typeToken) {
return CompletableFuture.supplyAsync(() -> {
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
rpcMap.put(rpc.getRuk(), rpc);
String ruk = rpc.getRuk();
rpcMap.put(ruk, rpc);
if (typeToken != null) {
rpcRetType.put(rpc.getRuk(), typeToken);
rpcRetType.put(ruk, typeToken);
}
try {
publish(topic, rpc);
synchronized (rpc) {
rpc.wait();
rpcMap.remove(ruk);
}
} catch (InterruptedException e) {
e.printStackTrace();
// todo: 设置请求失败
}
return rpc;
return rpc.getRpcResult();
});
}
// RpcResponse: {ruk:xxx-xxxx, retcode:0}
// RpcResult: {ruk:xxx-xxxx, retcode:0}
@Comment("rpc call back consumer")
private void rpcAccept(String value) {
RpcResponse resp = convert.convertFrom(new TypeToken<RpcResponse<String>>() {
RpcResult resp = convert.convertFrom(new TypeToken<RpcResult<String>>() {
}.getType(), value);
String ruk = resp.getRuk();
Rpc rpc = rpcMap.get(ruk);
if (rpc == null) {
return;
}
TypeToken typeToken = rpcRetType.get(ruk);
Object result = resp.getResult();
@ -493,7 +511,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
resp.setResult(result);
rpc.setResponse(resp);
rpc.setRpcResult(resp);
synchronized (rpc) {
rpc.notify();
}
@ -503,18 +521,23 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
private Set<String> rpcTopics = new HashSet();
@Comment("rpc call consumer")
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResponse<R>> fun) {
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResult<R>> fun) {
Consumer<String> consumer = v -> {
Rpc<T> rpc = convert.convertFrom(new TypeToken<Rpc<String>>() {
}.getType(), v);
// 参数转换
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue());
rpc.setValue(paras);
RpcResponse<R> response = fun.apply(rpc);
Rpc<T> rpc = null;
try {
rpc = convert.convertFrom(new TypeToken<Rpc<String>>() {
}.getType(), v);
// 参数转换
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue());
rpc.setValue(paras);
RpcResult<R> result = fun.apply(rpc);
publish(rpc.getBackTopic(), result);
} catch (Exception e) {
logger.log(Level.WARNING, "rpc call consumer error: " + v, e);
publish(rpc.getBackTopic(), rpc.buildError("服务调用失败!"));
}
// back
publish(rpc.getBackTopic(), response);
};
rpcTopics.add(topic);