diff --git a/conf/application.xml b/conf/application.xml deleted file mode 100644 index c519a89..0000000 --- a/conf/application.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/conf/kafak.properties b/conf/kafak.properties deleted file mode 100644 index 37bcd72..0000000 --- a/conf/kafak.properties +++ /dev/null @@ -1,19 +0,0 @@ -# Producer -#bootstrap.servers=47.111.150.118:6062 -#bootstrap.servers=121.196.17.55:6062 -bootstrap.servers=39.108.56.246:9092 -#bootstrap.servers=122.112.180.156:6062 -acks=all -retries=0 -batch.size=16384 -linger.ms=1 -buffer.memory=33554432 -key.serializer=org.apache.kafka.common.serialization.StringSerializer -value.serializer=org.apache.kafka.common.serialization.StringSerializer - -# Consumer -enable.auto.commit=true -auto.commit.interval.ms=1000 -group.id= -key.deserializer=org.apache.kafka.common.serialization.StringDeserializer -value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ No newline at end of file diff --git a/pom.xml b/pom.xml index efb7dd6..23c81c1 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.tccn zhub-client - 1.0-SNAPSHOT + 1.0 17 @@ -21,4 +21,18 @@ + + + + src + + + + + test + + + + + \ No newline at end of file diff --git a/src/com/zdemo/IType.java b/src/com/zdemo/IType.java index a8526f7..6cc3628 100644 --- a/src/com/zdemo/IType.java +++ b/src/com/zdemo/IType.java @@ -1,21 +1,21 @@ package com.zdemo; -import org.redkale.util.TypeToken; +import com.google.gson.reflect.TypeToken; import java.util.List; import java.util.Map; public interface IType { - TypeToken STRING = new TypeToken() { + TypeToken STRING = new TypeToken<>() { }; - TypeToken INT = new TypeToken() { + TypeToken INT = new TypeToken<>() { }; - TypeToken> MAP = new TypeToken>() { + TypeToken> MAP = new TypeToken<>() { }; - TypeToken>> LMAP = new TypeToken>>() { + TypeToken>> LMAP = new TypeToken<>() { }; } diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index 39f552f..e36b85b 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -6,9 +6,6 @@ import com.zdemo.Event; import com.zdemo.IConsumer; import com.zdemo.IProducer; import net.tccn.timer.Timers; -import org.redkale.util.AnyValue; -import org.redkale.util.Comment; -import org.redkale.util.Utility; import java.io.BufferedReader; import java.io.IOException; @@ -17,7 +14,10 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -35,8 +35,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer //private String password = ""; private String groupid = ""; - //private ReentrantLock lock = new ReentrantLock(); - private Socket client; private OutputStream writer; private BufferedReader reader; @@ -46,7 +44,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG private final LinkedBlockingQueue sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG - private BiConsumer threadBuilder = (r, n) -> { + private final BiConsumer threadBuilder = (r, n) -> { for (int i = 0; i < n; i++) { new Thread(() -> r.run()).start(); } @@ -54,7 +52,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer /*private static boolean isFirst = true; private boolean isMain = false;*/ - private static Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient + private static final Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient public ZHubClient(String addr, String groupid, String appname) { this.addr = addr; @@ -63,22 +61,16 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer init(null); } - public void init(AnyValue config) { + public void init(Map config) { if (!preInit()) { return; } // 自动注入 if (config != null) { - addr = config.getValue("addr", addr); - - // 合并 addr = host:port, 做历史兼容 - int port = config.getIntValue("port", 0); - if (port != 0 && !addr.contains(":")) { - addr = addr + ":" + port; - } - - groupid = config.getValue("groupid", groupid); + addr = config.getOrDefault("addr", addr); + groupid = config.getOrDefault("groupid", groupid); + APP_NAME = config.getOrDefault("appname", APP_NAME); } // 设置第一个启动的 实例为主实例 @@ -102,7 +94,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer continue; } - String type = ""; + String type; // +ping if ("+ping".equals(readLine)) { @@ -112,7 +104,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // 主题订阅消息 if ("*3".equals(readLine)) { - readLine = reader.readLine(); // $7 len() + reader.readLine(); // $7 len() + type = reader.readLine(); // message if (!"message".equals(type)) { continue; @@ -165,7 +158,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // timer 消息 if ("*2".equals(readLine)) { - readLine = reader.readLine(); // $7 len() + reader.readLine(); // $7 len() + type = reader.readLine(); // message if (!"timer".equals(type)) { continue; @@ -284,7 +278,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer if (data.length == 1) { sendMsgQueue.add(data[0] + "\r\n"); } else if (data.length > 1) { - StringBuffer buf = new StringBuffer(); + StringBuilder buf = new StringBuilder(); buf.append("*" + data.length + "\r\n"); for (String d : data) { buf.append("$" + strLength(d) + "\r\n"); @@ -339,7 +333,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer String host = hostPort[0]; int port = Integer.parseInt(hostPort[1]); - client = new Socket(); + //private ReentrantLock lock = new ReentrantLock(); + Socket client = new Socket(); client.connect(new InetSocketAddress(host, port)); client.setKeepAlive(true); @@ -356,7 +351,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer /*if (isMain) { }*/ if (mainHub.containsValue(this)) { - buf.append(" " + APP_NAME); + buf.append(" ").append(APP_NAME); } for (String topic : getTopics()) { buf.append(" ").append(topic); @@ -364,9 +359,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer send(buf.toString()); // 重连 timer 订阅 - timerMap.forEach((name, timer) -> { - send("timer", name); - }); + timerMap.forEach((name, timer) -> send("timer", name)); if (retry > 0) { logger.warning(String.format("ZHubClient[%s][%s] %s Succeed!", getGroupid(), i + 1, retry > 0 ? "reconnection" : "init")); } else { @@ -426,16 +419,12 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } } - if ("M".equals(endchar)) { - delay *= (1000 * 60 * 60 * 24 * 30); - } else if ("d".equals(endchar)) { - delay *= (1000 * 60 * 60 * 24); - } else if ("H".equals(endchar)) { - delay *= (1000 * 60 * 60); - } else if ("m".equals(endchar)) { - delay *= (1000 * 60); - } else if ("s".equals(endchar)) { - delay *= 1000; + switch (endchar) { + case "M" -> delay *= (1000 * 60 * 60 * 24 * 30); + case "d" -> delay *= (1000 * 60 * 60 * 24); + case "H" -> delay *= (1000 * 60 * 60); + case "m" -> delay *= (1000 * 60); + case "s" -> delay *= 1000; } delay(topic, v, delay); @@ -452,7 +441,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } // ================================================== lock ================================================== - private Map lockTag = new ConcurrentHashMap<>(); + private final Map lockTag = new ConcurrentHashMap<>(); public Lock tryLock(String key, int duration) { String uuid = UUID.randomUUID().toString().replaceAll("-", ""); @@ -472,7 +461,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } // ================================================== timer ================================================== - private ConcurrentHashMap timerMap = new ConcurrentHashMap(); + private final ConcurrentHashMap timerMap = new ConcurrentHashMap(); class Timer { String name; @@ -506,8 +495,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // ================================================== rpc ================================================== // -- 调用端 -- - private static Map rpcMap = new ConcurrentHashMap<>(); - private static Map rpcRetType = new ConcurrentHashMap<>(); + private static final Map rpcMap = new ConcurrentHashMap<>(); + private static final Map rpcRetType = new ConcurrentHashMap<>(); // rpc call public RpcResult rpc(String topic, Object v) { @@ -519,9 +508,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer return rpc(topic, v, typeToken, 0); } - @Comment("rpc call") + // rpc call public RpcResult rpc(String topic, T v, TypeToken typeToken, long timeout) { - Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v); + Rpc rpc = new Rpc<>(APP_NAME, UUID.randomUUID().toString().replaceAll("-", ""), topic, v); String ruk = rpc.getRuk(); rpcMap.put(ruk, rpc); if (typeToken != null) { @@ -602,7 +591,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } // -- 订阅端 -- - private Set rpcTopics = new HashSet(); + private final HashSet rpcTopics = new HashSet(); // rpc call consumer public void rpcSubscribe(String topic, TypeToken typeToken, Function, RpcResult> fun) { diff --git a/src/net/tccn/timer/Timers.java b/src/net/tccn/timer/Timers.java index 90f4857..b52cb45 100644 --- a/src/net/tccn/timer/Timers.java +++ b/src/net/tccn/timer/Timers.java @@ -1,23 +1,19 @@ package net.tccn.timer; import net.tccn.timer.scheduled.ScheduledCycle; -import org.redkale.util.Utility; +import java.util.UUID; import java.util.function.Supplier; public class Timers { - private static TimerExecutor timerExecutor = new TimerExecutor(1); + private static final TimerExecutor timerExecutor = new TimerExecutor(1); /** * 本地延时重试 - * - * @param supplier - * @param millis - * @param maxCount */ public static void tryDelay(Supplier supplier, long millis, int maxCount) { - timerExecutor.add(TimerTask.by("try-delay-task-" + Utility.uuid(), ScheduledCycle.of(0), task -> { + timerExecutor.add(TimerTask.by("try-delay-task-" + UUID.randomUUID().toString().replaceAll("-", ""), ScheduledCycle.of(0), task -> { if (supplier.get() || task.getExecCount() == maxCount) { task.setComplete(true); } @@ -32,12 +28,9 @@ public class Timers { /** * 本地延时:延时时间极短的场景下使用 (如:1分钟内) - * - * @param runnable - * @param millis */ public static void delay(Runnable runnable, long millis) { - timerExecutor.add(TimerTask.by("delay-task-" + Utility.uuid(), ScheduledCycle.of(millis), task -> { + timerExecutor.add(TimerTask.by("delay-task-" + UUID.randomUUID().toString().replaceAll("-", ""), ScheduledCycle.of(millis), task -> { runnable.run(); task.setComplete(true); })); diff --git a/test/HelloService.java b/test/HelloService.java index 90aae6b..efc1e40 100644 --- a/test/HelloService.java +++ b/test/HelloService.java @@ -58,6 +58,4 @@ public class HelloService { /*RpcResult x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() { });*/ - - }