修改:1、rpc 消费端异常捕获 2、代码风格修改
This commit is contained in:
parent
ee7154990b
commit
c6940cef2b
@ -28,13 +28,13 @@ public class ZhubListener implements ApplicationListener {
|
|||||||
AnyValue zhubs = appConfig.getAnyValue("zhubs");
|
AnyValue zhubs = appConfig.getAnyValue("zhubs");
|
||||||
AnyValue[] values = zhubs.getAnyValues("zhub");
|
AnyValue[] values = zhubs.getAnyValues("zhub");
|
||||||
for (AnyValue zhub : values) {
|
for (AnyValue zhub : values) {
|
||||||
String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
|
String className = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
|
||||||
try {
|
try {
|
||||||
Class<?> aClass = classLoader.loadClass(clazz);
|
Class<?> clazz = classLoader.loadClass(className);
|
||||||
Service obj = (Service) aClass.getDeclaredConstructor().newInstance();
|
Service obj = (Service) clazz.getDeclaredConstructor().newInstance();
|
||||||
application.getResourceFactory().inject(obj);
|
application.getResourceFactory().inject(obj);
|
||||||
obj.init(zhub);
|
obj.init(zhub);
|
||||||
resourceFactory.register(zhub.get("name"), aClass, obj);
|
resourceFactory.register(zhub.get("name"), clazz, obj);
|
||||||
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
|
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ public class Rpc<T> {
|
|||||||
private String topic; // call topic
|
private String topic; // call topic
|
||||||
private T value; // call paras
|
private T value; // call paras
|
||||||
|
|
||||||
private RpcResponse response;
|
private RpcResult rpcResult;
|
||||||
|
|
||||||
public Rpc() {
|
public Rpc() {
|
||||||
}
|
}
|
||||||
@ -43,12 +43,13 @@ public class Rpc<T> {
|
|||||||
this.value = value;
|
this.value = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
public <R> RpcResponse<R> getResponse() {
|
@ConvertColumn(ignore = true)
|
||||||
return response;
|
public RpcResult getRpcResult() {
|
||||||
|
return rpcResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setResponse(RpcResponse response) {
|
public void setRpcResult(RpcResult rpcResult) {
|
||||||
this.response = response;
|
this.rpcResult = rpcResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ConvertColumn(ignore = true)
|
@ConvertColumn(ignore = true)
|
||||||
@ -56,50 +57,32 @@ public class Rpc<T> {
|
|||||||
return ruk.split("::")[0];
|
return ruk.split("::")[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
public <R> RpcResponse<R> buildResp() {
|
public <R> RpcResult<R> buildResp() {
|
||||||
RpcResponse<R> response = new RpcResponse<>();
|
RpcResult<R> response = new RpcResult<>();
|
||||||
response.setRuk(ruk);
|
response.setRuk(ruk);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
public <R> RpcResponse<R> buildResp(int retcode, String retinfo) {
|
public <R> RpcResult<R> buildResp(int retcode, String retinfo) {
|
||||||
RpcResponse<R> response = new RpcResponse<>();
|
RpcResult<R> response = new RpcResult<>();
|
||||||
response.setRuk(ruk);
|
response.setRuk(ruk);
|
||||||
response.setRetcode(retcode);
|
response.setRetcode(retcode);
|
||||||
response.setRetinfo(retinfo);
|
response.setRetinfo(retinfo);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
public <R> RpcResponse<R> buildError(String retinfo) {
|
public <R> RpcResult<R> buildError(String retinfo) {
|
||||||
RpcResponse<R> response = new RpcResponse<>();
|
RpcResult<R> response = new RpcResult<>();
|
||||||
response.setRuk(ruk);
|
response.setRuk(ruk);
|
||||||
response.setRetcode(100);
|
response.setRetcode(100);
|
||||||
response.setRetinfo(retinfo);
|
response.setRetinfo(retinfo);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
public <R> RpcResponse<R> buildResp(R result) {
|
public <R> RpcResult<R> buildResp(R result) {
|
||||||
RpcResponse<R> response = new RpcResponse<>();
|
RpcResult<R> response = new RpcResult<>();
|
||||||
response.setRuk(ruk);
|
response.setRuk(ruk);
|
||||||
response.setResult(result);
|
response.setResult(result);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ConvertColumn(ignore = true)
|
|
||||||
public int getRetcode() {
|
|
||||||
if (this.response == null) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.getRetcode();
|
|
||||||
}
|
|
||||||
|
|
||||||
@ConvertColumn(ignore = true)
|
|
||||||
public String getRetinfo() {
|
|
||||||
if (this.response == null) {
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.getRetinfo();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package com.zdemo.zhub;
|
package com.zdemo.zhub;
|
||||||
|
|
||||||
public class RpcResponse<R> {
|
public class RpcResult<R> {
|
||||||
private String ruk;
|
private String ruk;
|
||||||
private int retcode;
|
private int retcode;
|
||||||
private String retinfo;
|
private String retinfo;
|
@ -58,6 +58,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private static boolean isFirst = true;
|
||||||
|
private boolean isMain = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(AnyValue config) {
|
public void init(AnyValue config) {
|
||||||
if (!preInit()) {
|
if (!preInit()) {
|
||||||
@ -71,6 +74,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
groupid = config.getValue("groupid", groupid);
|
groupid = config.getValue("groupid", groupid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设置第一个启动的 实例为主实例
|
||||||
|
if (isFirst) {
|
||||||
|
isMain = true;
|
||||||
|
isFirst = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (!initSocket(0)) {
|
if (!initSocket(0)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -121,6 +130,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
|
|
||||||
// oth msg
|
// oth msg
|
||||||
topicQueue.add(Event.of(topic, value));
|
topicQueue.add(Event.of(topic, value));
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// timer 消息
|
// timer 消息
|
||||||
@ -189,12 +199,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
//if (event)
|
//if (event)
|
||||||
|
logger.info(String.format("rpc-back:[%s]: %s", event.topic, event.value));
|
||||||
rpcAccept(event.value);
|
rpcAccept(event.value);
|
||||||
logger.info(String.format("rpc-back:[%s] => %s", event.topic, event.value));
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
|
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, 1);
|
}, 1);
|
||||||
@ -207,7 +217,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
if ((event = rpcCallQueue.take()) == null) {
|
if ((event = rpcCallQueue.take()) == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
logger.info(String.format("rpc-call:[%s] => %s", event.topic, event.value));
|
logger.info(String.format("rpc-call:[%s] %s", event.topic, event.value));
|
||||||
accept(event.topic, event.value);
|
accept(event.topic, event.value);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
@ -299,7 +309,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
}
|
}
|
||||||
send("groupid " + groupid);
|
send("groupid " + groupid);
|
||||||
|
|
||||||
StringBuffer buf = new StringBuffer("subscribe lock " + APP_NAME);
|
StringBuffer buf = new StringBuffer("subscribe lock");
|
||||||
|
if (isMain) { // TODO:
|
||||||
|
buf.append(" " + APP_NAME);
|
||||||
|
}
|
||||||
for (String topic : getTopics()) {
|
for (String topic : getTopics()) {
|
||||||
buf.append(" ").append(topic);
|
buf.append(" ").append(topic);
|
||||||
}
|
}
|
||||||
@ -448,43 +461,48 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
|
|
||||||
// ================================================== rpc ==================================================
|
// ================================================== rpc ==================================================
|
||||||
// -- 调用端 --
|
// -- 调用端 --
|
||||||
private Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
|
private static Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
|
||||||
private Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
|
private static Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Comment("rpc call")
|
@Comment("rpc call")
|
||||||
public CompletableFuture<Rpc> rpc(String topic, Object v) {
|
public CompletableFuture<RpcResult<Void>> rpc(String topic, Object v) {
|
||||||
return (CompletableFuture) rpc(topic, v, null);
|
return rpc(topic, v, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Comment("rpc call")
|
@Comment("rpc call")
|
||||||
public <T, R> CompletableFuture<Rpc<T>> rpc(String topic, T v, TypeToken<R> typeToken) {
|
public <T, R> CompletableFuture<RpcResult<R>> rpc(String topic, T v, TypeToken<R> typeToken) {
|
||||||
return CompletableFuture.supplyAsync(() -> {
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
|
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
|
||||||
rpcMap.put(rpc.getRuk(), rpc);
|
String ruk = rpc.getRuk();
|
||||||
|
rpcMap.put(ruk, rpc);
|
||||||
if (typeToken != null) {
|
if (typeToken != null) {
|
||||||
rpcRetType.put(rpc.getRuk(), typeToken);
|
rpcRetType.put(ruk, typeToken);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
publish(topic, rpc);
|
publish(topic, rpc);
|
||||||
synchronized (rpc) {
|
synchronized (rpc) {
|
||||||
rpc.wait();
|
rpc.wait();
|
||||||
|
rpcMap.remove(ruk);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
// todo: 设置请求失败
|
// todo: 设置请求失败
|
||||||
}
|
}
|
||||||
return rpc;
|
return rpc.getRpcResult();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// RpcResponse: {ruk:xxx-xxxx, retcode:0}
|
// RpcResult: {ruk:xxx-xxxx, retcode:0}
|
||||||
@Comment("rpc call back consumer")
|
@Comment("rpc call back consumer")
|
||||||
private void rpcAccept(String value) {
|
private void rpcAccept(String value) {
|
||||||
RpcResponse resp = convert.convertFrom(new TypeToken<RpcResponse<String>>() {
|
RpcResult resp = convert.convertFrom(new TypeToken<RpcResult<String>>() {
|
||||||
}.getType(), value);
|
}.getType(), value);
|
||||||
|
|
||||||
String ruk = resp.getRuk();
|
String ruk = resp.getRuk();
|
||||||
Rpc rpc = rpcMap.get(ruk);
|
Rpc rpc = rpcMap.get(ruk);
|
||||||
|
if (rpc == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
TypeToken typeToken = rpcRetType.get(ruk);
|
TypeToken typeToken = rpcRetType.get(ruk);
|
||||||
|
|
||||||
Object result = resp.getResult();
|
Object result = resp.getResult();
|
||||||
@ -493,7 +511,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
}
|
}
|
||||||
|
|
||||||
resp.setResult(result);
|
resp.setResult(result);
|
||||||
rpc.setResponse(resp);
|
rpc.setRpcResult(resp);
|
||||||
synchronized (rpc) {
|
synchronized (rpc) {
|
||||||
rpc.notify();
|
rpc.notify();
|
||||||
}
|
}
|
||||||
@ -503,18 +521,23 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
|||||||
private Set<String> rpcTopics = new HashSet();
|
private Set<String> rpcTopics = new HashSet();
|
||||||
|
|
||||||
@Comment("rpc call consumer")
|
@Comment("rpc call consumer")
|
||||||
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResponse<R>> fun) {
|
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResult<R>> fun) {
|
||||||
Consumer<String> consumer = v -> {
|
Consumer<String> consumer = v -> {
|
||||||
Rpc<T> rpc = convert.convertFrom(new TypeToken<Rpc<String>>() {
|
Rpc<T> rpc = null;
|
||||||
}.getType(), v);
|
try {
|
||||||
|
rpc = convert.convertFrom(new TypeToken<Rpc<String>>() {
|
||||||
// 参数转换
|
}.getType(), v);
|
||||||
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue());
|
|
||||||
rpc.setValue(paras);
|
|
||||||
RpcResponse<R> response = fun.apply(rpc);
|
|
||||||
|
|
||||||
|
// 参数转换
|
||||||
|
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue());
|
||||||
|
rpc.setValue(paras);
|
||||||
|
RpcResult<R> result = fun.apply(rpc);
|
||||||
|
publish(rpc.getBackTopic(), result);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.WARNING, "rpc call consumer error: " + v, e);
|
||||||
|
publish(rpc.getBackTopic(), rpc.buildError("服务调用失败!"));
|
||||||
|
}
|
||||||
// back
|
// back
|
||||||
publish(rpc.getBackTopic(), response);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcTopics.add(topic);
|
rpcTopics.add(topic);
|
||||||
|
Loading…
Reference in New Issue
Block a user