Compare commits
7 Commits
094d3fc5a2
...
dev/spring
Author | SHA1 | Date | |
---|---|---|---|
c2bcc00741 | |||
d28a8e4d7b | |||
e2c2b62665 | |||
e8b9140103 | |||
1786724c88 | |||
f4771aadf2 | |||
8e2779f2d8 |
18
pom.xml
18
pom.xml
@@ -5,8 +5,8 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>net.tccn</groupId>
|
||||
<artifactId>zhub-cli</artifactId>
|
||||
<version>1.0</version>
|
||||
<artifactId>zhub-client-spring</artifactId>
|
||||
<version>0.1.2</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
@@ -14,10 +14,18 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>2.8.8</version>
|
||||
<version>2.10.1</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
@@ -31,7 +39,7 @@
|
||||
<repository>
|
||||
<id>maven-nexus</id>
|
||||
<name>maven-nexus</name>
|
||||
<url>http://47.106.237.198:8081/repository/maven-public/</url>
|
||||
<url>https://nexus.1216.top/repository/maven-public/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
@@ -40,7 +48,7 @@
|
||||
<repository>
|
||||
<id>mvn-release</id>
|
||||
<name>mvn-release</name>
|
||||
<url>http://47.106.237.198:8081/repository/maven-releases/</url>
|
||||
<url>https://nexus.1216.top/repository/maven-releases/</url>
|
||||
</repository>
|
||||
</distributionManagement>
|
||||
|
||||
|
@@ -18,9 +18,6 @@ public abstract class AbstractConsumer implements IConsumer {
|
||||
|
||||
public Gson gson = Rpc.gson;
|
||||
|
||||
// @Resource(name = "APP_NAME")
|
||||
protected String APP_ID = "";
|
||||
|
||||
private Map<String, EventType> eventMap = new HashMap<>();
|
||||
|
||||
protected abstract String getGroupid();
|
||||
|
@@ -15,8 +15,8 @@ public class Event<V> {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static <V> Event of(String topic, V value) {
|
||||
return new Event<V>(topic, value);
|
||||
public static <V> Event<V> of(String topic, V value) {
|
||||
return new Event<>(topic, value);
|
||||
}
|
||||
|
||||
|
||||
|
@@ -2,10 +2,11 @@ package tccn.zhub;
|
||||
|
||||
// ================================================== lock ==================================================
|
||||
public class Lock {
|
||||
private String name;
|
||||
private String uuid;
|
||||
private int duration;
|
||||
private ZHubClient hubClient;
|
||||
protected String name;
|
||||
protected String uuid;
|
||||
protected int duration;
|
||||
protected boolean success;
|
||||
protected ZHubClient hubClient;
|
||||
|
||||
protected Lock(String name, String uuid, int duration, ZHubClient hubClient) {
|
||||
this.name = name;
|
||||
@@ -17,4 +18,8 @@ public class Lock {
|
||||
public void unLock() {
|
||||
hubClient.send("unlock", name, uuid);
|
||||
}
|
||||
|
||||
public boolean success() {
|
||||
return success;
|
||||
}
|
||||
}
|
||||
|
@@ -1,12 +1,15 @@
|
||||
package tccn.zhub;
|
||||
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import tccn.AbstractConsumer;
|
||||
import tccn.Event;
|
||||
import tccn.IConsumer;
|
||||
import tccn.IProducer;
|
||||
import tccn.timer.Timers;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
@@ -18,24 +21,35 @@ import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
@Component
|
||||
public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer {
|
||||
|
||||
public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName());
|
||||
@Value("${zhub.addr}")
|
||||
private String addr = "127.0.0.1:1216";
|
||||
//private String password = "";
|
||||
|
||||
@Value("${zhub.groupid}")
|
||||
private String groupid = "";
|
||||
|
||||
@Value("${zhub.auth}")
|
||||
private String auth = "";
|
||||
|
||||
@Value("${zhub.appid}")
|
||||
protected String appid = "";
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
init(null);
|
||||
}
|
||||
|
||||
private OutputStream writer;
|
||||
private BufferedReader reader;
|
||||
|
||||
@@ -45,12 +59,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
private final LinkedBlockingQueue<Event<String>> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG
|
||||
private final LinkedBlockingQueue<String> sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG
|
||||
|
||||
private final BiConsumer<Runnable, Integer> threadBuilder = (r, n) -> {
|
||||
for (int i = 0; i < n; i++) {
|
||||
new Thread(() -> r.run()).start();
|
||||
}
|
||||
};
|
||||
|
||||
/*private static boolean isFirst = true;
|
||||
private boolean isMain = false;*/
|
||||
private static final Map<String, ZHubClient> mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient
|
||||
@@ -58,7 +66,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
public ZHubClient(String addr, String groupid, String appid, String auth) {
|
||||
this.addr = addr;
|
||||
this.groupid = groupid;
|
||||
this.APP_ID = appid;
|
||||
this.appid = appid;
|
||||
this.auth = auth;
|
||||
init(null);
|
||||
}
|
||||
@@ -72,7 +80,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
if (config != null) {
|
||||
addr = config.getOrDefault("addr", addr);
|
||||
groupid = config.getOrDefault("groupid", groupid);
|
||||
APP_ID = config.getOrDefault("appname", APP_ID);
|
||||
appid = config.getOrDefault("appname", appid);
|
||||
}
|
||||
|
||||
// 设置第一个启动的 实例为主实例
|
||||
@@ -136,13 +144,26 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
Lock lock = lockTag.get(value);
|
||||
if (lock != null) {
|
||||
synchronized (lock) {
|
||||
lock.success = true;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// trylock msg
|
||||
if ("trylock".equals(topic)) {
|
||||
Lock lock = lockTag.get(value);
|
||||
if (lock != null) {
|
||||
synchronized (lock) {
|
||||
lock.success = false;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// rpc back msg
|
||||
if (APP_ID.equals(topic)) {
|
||||
if (appid.equals(topic)) {
|
||||
rpcBackQueue.add(Event.of(topic, value));
|
||||
continue;
|
||||
}
|
||||
@@ -184,98 +205,119 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
}).start();
|
||||
|
||||
// 定时调度事件
|
||||
threadBuilder.accept(() -> {
|
||||
new Thread(() -> {
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
while (true) {
|
||||
Timer timer = null;
|
||||
try {
|
||||
if ((timer = timerQueue.take()) == null) {
|
||||
return;
|
||||
}
|
||||
timer = timerQueue.take();
|
||||
long start = System.currentTimeMillis();
|
||||
timer.runnable.run();
|
||||
executor.submit(timer.runnable).get(5, TimeUnit.SECONDS);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start));
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "timer [" + timer.name + "]", e);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof TimeoutException) {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
logger.log(Level.WARNING, "TimeoutException [" + timer.name + "]", e);
|
||||
} else {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 1);
|
||||
}).start();
|
||||
|
||||
// topic msg
|
||||
threadBuilder.accept(() -> {
|
||||
new Thread(() -> {
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
while (true) {
|
||||
Event<String> event = null;
|
||||
try {
|
||||
if ((event = topicQueue.take()) == null) {
|
||||
continue;
|
||||
}
|
||||
event = topicQueue.take();
|
||||
logger.log(Level.FINE, "topic[" + event.topic + "] :" + event.value);
|
||||
accept(event.topic, event.value);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
|
||||
|
||||
String topic = event.topic;
|
||||
String value = event.value;
|
||||
executor.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof TimeoutException) {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
logger.log(Level.WARNING, "TimeoutException, topic[" + event.topic + "], value[" + event.value + "]", e);
|
||||
} else if (event != null) {
|
||||
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 1);
|
||||
|
||||
}, "ZHub-topic-accept").start();
|
||||
// rpc back
|
||||
threadBuilder.accept(() -> {
|
||||
new Thread(() -> {
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
while (true) {
|
||||
Event<String> event = null;
|
||||
try {
|
||||
if ((event = rpcBackQueue.take()) == null) {
|
||||
continue;
|
||||
}
|
||||
//if (event)
|
||||
event = rpcBackQueue.take();
|
||||
logger.info(String.format("rpc-back:[%s]: %s", event.topic, event.value));
|
||||
rpcAccept(event.value);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e);
|
||||
|
||||
String value = event.value;
|
||||
executor.submit(() -> rpcAccept(value)).get(5, TimeUnit.SECONDS);
|
||||
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof TimeoutException) {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
logger.log(Level.WARNING, "rpc-back TimeoutException, topic[" + event.topic + "], value[" + event.value + "]", e);
|
||||
} else if (event != null) {
|
||||
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e);
|
||||
}
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}, 1);
|
||||
}).start();
|
||||
|
||||
// rpc call
|
||||
threadBuilder.accept(() -> {
|
||||
new Thread(() -> {
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
while (true) {
|
||||
Event<String> event = null;
|
||||
try {
|
||||
if ((event = rpcCallQueue.take()) == null) {
|
||||
continue;
|
||||
}
|
||||
event = rpcCallQueue.take();
|
||||
logger.info(String.format("rpc-call:[%s] %s", event.topic, event.value));
|
||||
accept(event.topic, event.value);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
|
||||
|
||||
String topic = event.topic;
|
||||
String value = event.value;
|
||||
executor.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof TimeoutException) {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
logger.log(Level.WARNING, "rpc-call TimeoutException, topic[" + event.topic + "], value[" + event.value + "]", e);
|
||||
} else if (event != null) {
|
||||
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 1);
|
||||
}, "ZHub-rpc-call").start();
|
||||
|
||||
// send msg
|
||||
threadBuilder.accept(() -> {
|
||||
new Thread(() -> {
|
||||
while (true) {
|
||||
String msg = null;
|
||||
try {
|
||||
if ((msg = sendMsgQueue.take()) == null) {
|
||||
continue;
|
||||
}
|
||||
// logger.log(Level.FINEST, "send-msg: [" + msg + "]");
|
||||
writer.write(msg.getBytes());
|
||||
msg = sendMsgQueue.take();
|
||||
writer.write(msg.getBytes(UTF_8));
|
||||
writer.flush();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
} catch (InterruptedException | IOException e) {
|
||||
logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e);
|
||||
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
assert msg != null;
|
||||
writer.write(msg.getBytes(UTF_8));
|
||||
writer.flush();
|
||||
} catch (IOException | InterruptedException | NullPointerException ex) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 1);
|
||||
}).start();
|
||||
|
||||
}
|
||||
|
||||
@@ -355,11 +397,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
send("auth", auth);
|
||||
send("groupid " + groupid);
|
||||
|
||||
StringBuffer buf = new StringBuffer("subscribe lock");
|
||||
StringBuffer buf = new StringBuffer("subscribe lock trylock");
|
||||
/*if (isMain) {
|
||||
}*/
|
||||
if (mainHub.containsValue(this)) {
|
||||
buf.append(" ").append(APP_ID);
|
||||
buf.append(" ").append(appid);
|
||||
}
|
||||
for (String topic : getTopics()) {
|
||||
buf.append(" ").append(topic);
|
||||
@@ -466,14 +508,35 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
// ================================================== lock ==================================================
|
||||
private final Map<String, Lock> lockTag = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 尝试加锁,立即返回,
|
||||
*
|
||||
* @param key
|
||||
* @param duration
|
||||
* @return Lock: lock.success 锁定是否成功标识
|
||||
*/
|
||||
public Lock tryLock(String key, int duration) {
|
||||
return lock("trylock", key, duration);
|
||||
}
|
||||
|
||||
public Lock lock(String key, int duration) {
|
||||
return lock("lock", key, duration);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cmd lock|trylock
|
||||
* @param key 加锁 key
|
||||
* @param duration 锁定时长
|
||||
* @return
|
||||
*/
|
||||
private Lock lock(String cmd, String key, int duration) {
|
||||
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
|
||||
Lock lock = new Lock(key, uuid, duration, this);
|
||||
lockTag.put(uuid, lock);
|
||||
|
||||
try {
|
||||
// c.send("lock", key, uuid, strconv.Itoa(duration))
|
||||
send("lock", key, uuid, String.valueOf(duration));
|
||||
send(cmd, key, uuid, String.valueOf(duration));
|
||||
synchronized (lock) {
|
||||
lock.wait();
|
||||
}
|
||||
@@ -533,7 +596,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
|
||||
// rpc call
|
||||
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) {
|
||||
Rpc rpc = new Rpc<>(APP_ID, UUID.randomUUID().toString().replaceAll("-", ""), topic, v);
|
||||
Rpc rpc = new Rpc<>(appid, UUID.randomUUID().toString().replaceAll("-", ""), topic, v);
|
||||
String ruk = rpc.getRuk();
|
||||
rpcMap.put(ruk, rpc);
|
||||
if (typeToken != null) {
|
||||
|
7
src/main/resources/application.yml
Normal file
7
src/main/resources/application.yml
Normal file
@@ -0,0 +1,7 @@
|
||||
|
||||
# zhub 配置
|
||||
zhub:
|
||||
appid: local_api
|
||||
addr: 127.0.0.1:1216
|
||||
groupid: hub-api
|
||||
auth: token-12345
|
@@ -1,6 +1,7 @@
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import tccn.IType;
|
||||
import tccn.zhub.Lock;
|
||||
import tccn.zhub.ZHubClient;
|
||||
|
||||
// @RestService(automapping = true)
|
||||
@@ -15,11 +16,15 @@ public class HelloService {
|
||||
|
||||
|
||||
//zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "zchd@123456");
|
||||
zhub = new ZHubClient("47.111.150.118:6066", "g-dev", "DEV-LOCAL", "zchd@123456");
|
||||
zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "token-12345");
|
||||
|
||||
zhub.subscribe("tv:test", x -> {
|
||||
System.out.println(x);
|
||||
});
|
||||
|
||||
Lock lock = zhub.tryLock("lock-a", 5);
|
||||
System.out.println("lock-1: " + lock.success());
|
||||
|
||||
//zhub.init(Kv.of("host", "47.111.150.118", "port", "6066", "groupid", "g-dev", "appname", "DEV-LOCAL"));
|
||||
|
||||
// Function<Rpc<T>, RpcResult<R>> fun
|
||||
@@ -65,9 +70,28 @@ public class HelloService {
|
||||
});
|
||||
|
||||
zhub.rpcSubscribe("rpc-x", IType.STRING, x -> {
|
||||
return x.buildResp(x.getValue().toUpperCase());
|
||||
return x.render(x.getValue().toUpperCase());
|
||||
});
|
||||
|
||||
Lock lock = zhub.tryLock("lock-a", 5);
|
||||
System.out.println("lock-2: " + lock.success());
|
||||
|
||||
try {
|
||||
Thread.sleep(5 * 1000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
Lock lock2 = zhub.tryLock("lock-a", 5);
|
||||
System.out.println("lock-3: " + lock2.success());
|
||||
/*try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}*/
|
||||
Lock lock3 = zhub.tryLock("lock-a", 5);
|
||||
System.out.println("lock-4: " + lock3.success());
|
||||
|
||||
try {
|
||||
Thread.sleep(3000 * 30000);
|
||||
} catch (InterruptedException e) {
|
||||
@@ -75,6 +99,10 @@ public class HelloService {
|
||||
}
|
||||
}
|
||||
|
||||
public void lockTest() {
|
||||
|
||||
}
|
||||
|
||||
/*RpcResult<FileToken> x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() {
|
||||
});*/
|
||||
}
|
||||
|
Reference in New Issue
Block a user