From 241a507ebca30d2839f29fbd2d71f9d5acca0e46 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Wed, 7 Apr 2021 16:27:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A1=E3=80=81rpc=20?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E7=AB=AF=E5=BC=82=E5=B8=B8=E6=8D=95=E8=8E=B7?= =?UTF-8?q?=202=E3=80=81=E4=BB=A3=E7=A0=81=E9=A3=8E=E6=A0=BC=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/com/zdemo/ZhubListener.java | 8 +-- src/com/zdemo/zhub/Rpc.java | 45 ++++-------- .../zhub/{RpcResponse.java => RpcResult.java} | 2 +- src/com/zdemo/zhub/ZHubClient.java | 71 ++++++++++++------- 4 files changed, 66 insertions(+), 60 deletions(-) rename src/com/zdemo/zhub/{RpcResponse.java => RpcResult.java} (95%) diff --git a/src/com/zdemo/ZhubListener.java b/src/com/zdemo/ZhubListener.java index 8fc79bf..7db44f7 100644 --- a/src/com/zdemo/ZhubListener.java +++ b/src/com/zdemo/ZhubListener.java @@ -28,13 +28,13 @@ public class ZhubListener implements ApplicationListener { AnyValue zhubs = appConfig.getAnyValue("zhubs"); AnyValue[] values = zhubs.getAnyValues("zhub"); for (AnyValue zhub : values) { - String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient"); + String className = zhub.getValue("value", "com.zdemo.zhub.ZHubClient"); try { - Class aClass = classLoader.loadClass(clazz); - Service obj = (Service) aClass.getDeclaredConstructor().newInstance(); + Class clazz = classLoader.loadClass(className); + Service obj = (Service) clazz.getDeclaredConstructor().newInstance(); application.getResourceFactory().inject(obj); obj.init(zhub); - resourceFactory.register(zhub.get("name"), aClass, obj); + resourceFactory.register(zhub.get("name"), clazz, obj); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) { e.printStackTrace(); } diff --git a/src/com/zdemo/zhub/Rpc.java b/src/com/zdemo/zhub/Rpc.java index f17f4cf..0eff756 100644 --- a/src/com/zdemo/zhub/Rpc.java +++ b/src/com/zdemo/zhub/Rpc.java @@ -8,7 +8,7 @@ public class Rpc { private String topic; // call topic private T value; // call paras - private RpcResponse response; + private RpcResult rpcResult; public Rpc() { } @@ -43,12 +43,13 @@ public class Rpc { this.value = value; } - public RpcResponse getResponse() { - return response; + @ConvertColumn(ignore = true) + public RpcResult getRpcResult() { + return rpcResult; } - public void setResponse(RpcResponse response) { - this.response = response; + public void setRpcResult(RpcResult rpcResult) { + this.rpcResult = rpcResult; } @ConvertColumn(ignore = true) @@ -56,50 +57,32 @@ public class Rpc { return ruk.split("::")[0]; } - public RpcResponse buildResp() { - RpcResponse response = new RpcResponse<>(); + public RpcResult buildResp() { + RpcResult response = new RpcResult<>(); response.setRuk(ruk); return response; } - public RpcResponse buildResp(int retcode, String retinfo) { - RpcResponse response = new RpcResponse<>(); + public RpcResult buildResp(int retcode, String retinfo) { + RpcResult response = new RpcResult<>(); response.setRuk(ruk); response.setRetcode(retcode); response.setRetinfo(retinfo); return response; } - public RpcResponse buildError(String retinfo) { - RpcResponse response = new RpcResponse<>(); + public RpcResult buildError(String retinfo) { + RpcResult response = new RpcResult<>(); response.setRuk(ruk); response.setRetcode(100); response.setRetinfo(retinfo); return response; } - public RpcResponse buildResp(R result) { - RpcResponse response = new RpcResponse<>(); + public RpcResult buildResp(R result) { + RpcResult response = new RpcResult<>(); response.setRuk(ruk); response.setResult(result); return response; } - - @ConvertColumn(ignore = true) - public int getRetcode() { - if (this.response == null) { - return -1; - } - - return response.getRetcode(); - } - - @ConvertColumn(ignore = true) - public String getRetinfo() { - if (this.response == null) { - return ""; - } - - return response.getRetinfo(); - } } diff --git a/src/com/zdemo/zhub/RpcResponse.java b/src/com/zdemo/zhub/RpcResult.java similarity index 95% rename from src/com/zdemo/zhub/RpcResponse.java rename to src/com/zdemo/zhub/RpcResult.java index 6cc5bb7..069b185 100644 --- a/src/com/zdemo/zhub/RpcResponse.java +++ b/src/com/zdemo/zhub/RpcResult.java @@ -1,6 +1,6 @@ package com.zdemo.zhub; -public class RpcResponse { +public class RpcResult { private String ruk; private int retcode; private String retinfo; diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index fda403a..5170e9f 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -58,6 +58,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } }; + private static boolean isFirst = true; + private boolean isMain = false; + @Override public void init(AnyValue config) { if (!preInit()) { @@ -71,6 +74,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer groupid = config.getValue("groupid", groupid); } + // 设置第一个启动的 实例为主实例 + if (isFirst) { + isMain = true; + isFirst = false; + } + if (!initSocket(0)) { return; } @@ -121,6 +130,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // oth msg topicQueue.add(Event.of(topic, value)); + continue; } // timer 消息 @@ -189,12 +199,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer continue; } //if (event) + logger.info(String.format("rpc-back:[%s]: %s", event.topic, event.value)); rpcAccept(event.value); - logger.info(String.format("rpc-back:[%s] => %s", event.topic, event.value)); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { - logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); + logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e); } } }, 1); @@ -207,7 +217,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer if ((event = rpcCallQueue.take()) == null) { continue; } - logger.info(String.format("rpc-call:[%s] => %s", event.topic, event.value)); + logger.info(String.format("rpc-call:[%s] %s", event.topic, event.value)); accept(event.topic, event.value); } catch (InterruptedException e) { e.printStackTrace(); @@ -299,7 +309,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } send("groupid " + groupid); - StringBuffer buf = new StringBuffer("subscribe lock " + APP_NAME); + StringBuffer buf = new StringBuffer("subscribe lock"); + if (isMain) { // TODO: + buf.append(" " + APP_NAME); + } for (String topic : getTopics()) { buf.append(" ").append(topic); } @@ -448,43 +461,48 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // ================================================== rpc ================================================== // -- 调用端 -- - private Map rpcMap = new ConcurrentHashMap<>(); - private Map rpcRetType = new ConcurrentHashMap<>(); + private static Map rpcMap = new ConcurrentHashMap<>(); + private static Map rpcRetType = new ConcurrentHashMap<>(); @Comment("rpc call") - public CompletableFuture rpc(String topic, Object v) { - return (CompletableFuture) rpc(topic, v, null); + public CompletableFuture> rpc(String topic, Object v) { + return rpc(topic, v, null); } @Comment("rpc call") - public CompletableFuture> rpc(String topic, T v, TypeToken typeToken) { + public CompletableFuture> rpc(String topic, T v, TypeToken typeToken) { return CompletableFuture.supplyAsync(() -> { Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v); - rpcMap.put(rpc.getRuk(), rpc); + String ruk = rpc.getRuk(); + rpcMap.put(ruk, rpc); if (typeToken != null) { - rpcRetType.put(rpc.getRuk(), typeToken); + rpcRetType.put(ruk, typeToken); } try { publish(topic, rpc); synchronized (rpc) { rpc.wait(); + rpcMap.remove(ruk); } } catch (InterruptedException e) { e.printStackTrace(); // todo: 设置请求失败 } - return rpc; + return rpc.getRpcResult(); }); } - // RpcResponse: {ruk:xxx-xxxx, retcode:0} + // RpcResult: {ruk:xxx-xxxx, retcode:0} @Comment("rpc call back consumer") private void rpcAccept(String value) { - RpcResponse resp = convert.convertFrom(new TypeToken>() { + RpcResult resp = convert.convertFrom(new TypeToken>() { }.getType(), value); String ruk = resp.getRuk(); Rpc rpc = rpcMap.get(ruk); + if (rpc == null) { + return; + } TypeToken typeToken = rpcRetType.get(ruk); Object result = resp.getResult(); @@ -493,7 +511,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } resp.setResult(result); - rpc.setResponse(resp); + rpc.setRpcResult(resp); synchronized (rpc) { rpc.notify(); } @@ -503,18 +521,23 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer private Set rpcTopics = new HashSet(); @Comment("rpc call consumer") - public void rpcSubscribe(String topic, TypeToken typeToken, Function, RpcResponse> fun) { + public void rpcSubscribe(String topic, TypeToken typeToken, Function, RpcResult> fun) { Consumer consumer = v -> { - Rpc rpc = convert.convertFrom(new TypeToken>() { - }.getType(), v); - - // 参数转换 - T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue()); - rpc.setValue(paras); - RpcResponse response = fun.apply(rpc); + Rpc rpc = null; + try { + rpc = convert.convertFrom(new TypeToken>() { + }.getType(), v); + // 参数转换 + T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue()); + rpc.setValue(paras); + RpcResult result = fun.apply(rpc); + publish(rpc.getBackTopic(), result); + } catch (Exception e) { + logger.log(Level.WARNING, "rpc call consumer error: " + v, e); + publish(rpc.getBackTopic(), rpc.buildError("服务调用失败!")); + } // back - publish(rpc.getBackTopic(), response); }; rpcTopics.add(topic);