diff --git a/pom.xml b/pom.xml index 9c15025..bf36328 100644 --- a/pom.xml +++ b/pom.xml @@ -14,11 +14,26 @@ UTF-8 + + + maven-release + maven-nexus + https://nexus.1216.top/repository/maven-public/ + + + + + mvn-release + mvn-release + https://nexus.1216.top/repository/maven-releases/ + + + org.redkale redkale - 2.8.0-dev + 2.8.0.dev compile @@ -28,22 +43,4 @@ compile - - - - - maven-nexus - maven-nexus - http://47.106.237.198:8081/repository/maven-public/ - - - - - - - mvn-release - mvn-release - http://47.106.237.198:8081/repository/maven-releases/ - - \ No newline at end of file diff --git a/src/main/java/net/tccn/AbstractConsumer.java b/src/main/java/net/tccn/AbstractConsumer.java index f126411..c2dbb9c 100644 --- a/src/main/java/net/tccn/AbstractConsumer.java +++ b/src/main/java/net/tccn/AbstractConsumer.java @@ -19,7 +19,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon protected static String APP_NAME = ""; - private Map eventMap = new ConcurrentHashMap<>(); + protected Map> eventMap = new ConcurrentHashMap<>(); protected abstract String getGroupid(); @@ -31,6 +31,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon return Set.of("-"); } + // topic 消息消费前处理 protected void accept(String topic, String value) { EventType eventType = eventMap.get(topic); @@ -44,6 +45,12 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon eventType.accept(data); } + // rpc 被调用端 + protected void rpcAccept(String topic, T value) { + EventType eventType = eventMap.get(topic); + eventType.accept(value); + } + protected final void removeEventType(String topic) { eventMap.remove(topic); } @@ -77,4 +84,13 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon public String resourceName() { return super.getName(); } + + protected String toStr(Object v) { + if (v instanceof String) { + return (String) v; + } else if (v == null) { + return null; + } + return convert.convertTo(v); + } } diff --git a/src/main/java/net/tccn/Event.java b/src/main/java/net/tccn/Event.java index cc957c6..fcc8b65 100644 --- a/src/main/java/net/tccn/Event.java +++ b/src/main/java/net/tccn/Event.java @@ -15,8 +15,8 @@ public class Event { this.value = value; } - public static Event of(String topic, V value) { - return new Event(topic, value); + public static Event of(String topic, V value) { + return new Event<>(topic, value); } diff --git a/src/main/java/net/tccn/timer/TimerExecutor.java b/src/main/java/net/tccn/timer/TimerExecutor.java index 720ddf5..355788e 100644 --- a/src/main/java/net/tccn/timer/TimerExecutor.java +++ b/src/main/java/net/tccn/timer/TimerExecutor.java @@ -24,7 +24,7 @@ public class TimerExecutor { for (Task t : task) { t.setTimerExecutor(this); queue.push(t); - logger.finest("add new task : " + t.getName()); + // logger.finest("add new task : " + t.getName()); } } diff --git a/src/main/java/net/tccn/timer/TimerTask.java b/src/main/java/net/tccn/timer/TimerTask.java index e1993b4..95865aa 100644 --- a/src/main/java/net/tccn/timer/TimerTask.java +++ b/src/main/java/net/tccn/timer/TimerTask.java @@ -93,10 +93,10 @@ public class TimerTask implements Task { if (!isComplete) { int count = execCount.incrementAndGet(); // 执行次数+1 - long start = System.currentTimeMillis(); + // long start = System.currentTimeMillis(); job.execute(this); - long end = System.currentTimeMillis(); - logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count)); + // long end = System.currentTimeMillis(); + // logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count)); if (!isComplete) { timerExecutor.add(this, true); diff --git a/src/main/java/net/tccn/zhub/Rpc.java b/src/main/java/net/tccn/zhub/Rpc.java index 5d1aaeb..b2a47a4 100644 --- a/src/main/java/net/tccn/zhub/Rpc.java +++ b/src/main/java/net/tccn/zhub/Rpc.java @@ -1,7 +1,6 @@ package net.tccn.zhub; import org.redkale.convert.ConvertColumn; -import org.redkale.convert.json.JsonConvert; import org.redkale.service.RetResult; public class Rpc { @@ -14,10 +13,10 @@ public class Rpc { public Rpc() { } - protected Rpc(String appname, String ruk, String topic, Object value) { + protected Rpc(String appname, String ruk, String topic, T value) { this.ruk = appname + "::" + ruk; this.topic = topic; - this.value = (T) JsonConvert.root().convertTo(value); + this.value = value; } public String getRuk() { diff --git a/src/main/java/net/tccn/zhub/ZHubClient.java b/src/main/java/net/tccn/zhub/ZHubClient.java index 3ce64a2..539e7a9 100644 --- a/src/main/java/net/tccn/zhub/ZHubClient.java +++ b/src/main/java/net/tccn/zhub/ZHubClient.java @@ -41,17 +41,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer private BufferedReader reader; private final LinkedBlockingQueue timerQueue = new LinkedBlockingQueue<>(); - private final LinkedBlockingQueue> topicQueue = new LinkedBlockingQueue<>(); - private final LinkedBlockingQueue> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG - private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG + private final LinkedBlockingQueue> topicQueue = new LinkedBlockingQueue<>(); // [=> Object] + private final LinkedBlockingQueue> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG [=> Object] + private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG [=> Object] private final LinkedBlockingQueue sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG - /*private BiConsumer threadBuilder = (r, n) -> { - for (int i = 0; i < n; i++) { - new Thread(() -> r.run()).start(); - } - };*/ - private static Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient public ZHubClient() { @@ -145,7 +139,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer String value = ""; do { - if (value.length() > 0) { + if (!value.isEmpty()) { value += "\r\n"; } String s = reader.readLine(); @@ -225,9 +219,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer while (true) { Timer timer = null; try { - if ((timer = timerQueue.take()) == null) { - return; - } + timer = timerQueue.take(); + long start = System.currentTimeMillis(); pool.submit(timer.runnable).get(5, TimeUnit.SECONDS); long end = System.currentTimeMillis(); @@ -249,9 +242,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer while (true) { Event event = null; try { - if ((event = topicQueue.take()) == null) { - continue; - } + event = topicQueue.take(); String topic = event.topic; String value = event.value; @@ -259,10 +250,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } catch (InterruptedException e) { e.printStackTrace(); } catch (TimeoutException e) { - logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e); pool = Executors.newFixedThreadPool(1); } catch (Exception e) { - logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); + logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + toStr(event.value), e); } } }).start(); @@ -270,18 +261,16 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // rpc back ,仅做数据解析,暂无耗时监控 new Thread(() -> { while (true) { - Event event = null; + Event event = null; try { - if ((event = rpcBackQueue.take()) == null) { - continue; - } + event = rpcBackQueue.take(); //if (event) - logger.finest(String.format("rpc-back:[%s]: %s", event.topic, event.value)); + logger.finest(String.format("rpc-back:[%s]: %s", event.topic, toStr(event.value))); rpcAccept(event.value); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { - logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e); + logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + toStr(event.value), e); } } }).start(); @@ -290,22 +279,21 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer new Thread(() -> { ExecutorService pool = Executors.newFixedThreadPool(1); while (true) { - Event event = null; + Event event = null; try { - if ((event = rpcCallQueue.take()) == null) { - continue; - } - logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value)); + event = rpcCallQueue.take(); + + logger.finest(String.format("rpc-call:[%s] %s", event.topic, toStr(event.value))); String topic = event.topic; - String value = event.value; - pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); + Object value = event.value; + pool.submit(() -> rpcAccept(topic, value)).get(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (TimeoutException e) { - logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e); pool = Executors.newFixedThreadPool(1); } catch (Exception e) { - logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e); + logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + toStr(event.value), e); } } }).start(); @@ -315,9 +303,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer while (true) { String msg = null; try { - if ((msg = sendMsgQueue.take()) == null) { - continue; - } + msg = sendMsgQueue.take(); + // logger.log(Level.FINEST, "send-msg: [" + msg + "]"); writer.write(msg.getBytes()); writer.flush(); @@ -402,15 +389,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer return str.length(); } - private String toStr(Object v) { - if (v instanceof String) { - return (String) v; - } else if (v == null) { - return null; - } - return convert.convertTo(v); - } - protected boolean initSocket(int retry) { for (int i = 0; i <= retry; i++) { try { @@ -479,11 +457,15 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } public boolean publish(String topic, Object v) { + /*if (eventMap.containsKey(topic)) { // 本地调用 + topicQueue.add(Event.of(topic, v)); + return true; + }*/ return send("publish", topic, toStr(v)); } public void broadcast(String topic, Object v) { - send("broadcast", topic, toStr(v)); + send("broadcast", topic, toStr(v)); // 广播必须走远端模式 } // 发送 publish 主题消息,若多次发送的 topic + "-" + value 相同,将会做延时重置 @@ -633,7 +615,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer rpcRetType.put(ruk, typeToken); } try { - publish(topic, rpc); // send("rpc", topic, toStr(rpc)); + if (eventMap.containsKey(topic)) { // 本地调用 + rpcCallQueue.add(Event.of(topic, rpc)); + } else { + rpc.setValue(toStr(rpc.getValue())); + publish(topic, rpc); // send("rpc", topic, toStr(rpc)); + } synchronized (rpc) { if (timeout <= 0) { timeout = 1000 * 15; @@ -683,9 +670,20 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // RpcResult: {ruk:xxx-xxxx, retcode:0} @Comment("rpc call back consumer") - private void rpcAccept(String value) { + private void rpcAccept(T value) { + // 接收到 本地调用返回的 RpcResult TODO, 本地模式下返回的数据对象类型需要和处理端一致,不然会出现类型转换异常 - 解决办法,当出现不一致的情况取数据做转换 + if (value instanceof RpcResult) { + String ruk = ((RpcResult) value).getRuk(); + Rpc rpc = rpcMap.remove(ruk); + rpc.setRpcResult((RpcResult) value); + synchronized (rpc) { + rpc.notify(); + } + return; + } + RpcResult resp = convert.convertFrom(new TypeToken>() { - }.getType(), value); + }.getType(), (String) value); String ruk = resp.getRuk(); Rpc rpc = rpcMap.remove(ruk); @@ -716,18 +714,28 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer @Comment("rpc call consumer") public void rpcSubscribe(String topic, TypeToken typeToken, Function, RpcResult> fun) { - Consumer consumer = v -> { + Consumer consumer = v -> { Rpc rpc = null; try { - rpc = convert.convertFrom(new TypeToken>() { - }.getType(), v); + if (v instanceof String) { + rpc = convert.convertFrom(IType.STRING.getType(), (String) v); + } else { + rpc = (Rpc) v; + } // 参数转换 - T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue()); - rpc.setValue(paras); + if (rpc.getValue() instanceof String && !"java.lang.String".equals(typeToken.getType().getTypeName())) { + T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue()); + rpc.setValue(paras); + } + RpcResult result = fun.apply(rpc); - result.setResult(toStr(result.getResult())); - publish(rpc.getBackTopic(), result); + if (APP_NAME.equals(rpc.getBackTopic())) { + rpcBackQueue.add(Event.of(topic, result)); + } else { + result.setResult(toStr(result.getResult())); // 远程模式 结果转换 + publish(rpc.getBackTopic(), result); + } } catch (Exception e) { logger.log(Level.WARNING, "rpc call consumer error: " + v, e); publish(rpc.getBackTopic(), rpc.retError("服务调用失败!")); @@ -736,6 +744,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer }; rpcTopics.add(topic); - subscribe(topic, consumer); + subscribe(topic, typeToken, consumer); } }