Compare commits

...

2 Commits

6 changed files with 112 additions and 20 deletions

10
pom.xml
View File

@ -14,6 +14,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.4</version>
</dependency>
<dependency> <dependency>
<groupId>com.google.code.gson</groupId> <groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId> <artifactId>gson</artifactId>
@ -27,7 +33,7 @@
</dependency> </dependency>
</dependencies> </dependencies>
<repositories> <!--<repositories>
<repository> <repository>
<id>maven-nexus</id> <id>maven-nexus</id>
<name>maven-nexus</name> <name>maven-nexus</name>
@ -42,6 +48,6 @@
<name>mvn-release</name> <name>mvn-release</name>
<url>http://47.106.237.198:8081/repository/maven-releases/</url> <url>http://47.106.237.198:8081/repository/maven-releases/</url>
</repository> </repository>
</distributionManagement> </distributionManagement>-->
</project> </project>

View File

@ -18,9 +18,6 @@ public abstract class AbstractConsumer implements IConsumer {
public Gson gson = Rpc.gson; public Gson gson = Rpc.gson;
// @Resource(name = "APP_NAME")
protected String APP_ID = "";
private Map<String, EventType> eventMap = new HashMap<>(); private Map<String, EventType> eventMap = new HashMap<>();
protected abstract String getGroupid(); protected abstract String getGroupid();

View File

@ -2,10 +2,11 @@ package tccn.zhub;
// ================================================== lock ================================================== // ================================================== lock ==================================================
public class Lock { public class Lock {
private String name; protected String name;
private String uuid; protected String uuid;
private int duration; protected int duration;
private ZHubClient hubClient; protected boolean success;
protected ZHubClient hubClient;
protected Lock(String name, String uuid, int duration, ZHubClient hubClient) { protected Lock(String name, String uuid, int duration, ZHubClient hubClient) {
this.name = name; this.name = name;
@ -17,4 +18,8 @@ public class Lock {
public void unLock() { public void unLock() {
hubClient.send("unlock", name, uuid); hubClient.send("unlock", name, uuid);
} }
public boolean success() {
return success;
}
} }

View File

@ -1,12 +1,15 @@
package tccn.zhub; package tccn.zhub;
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import tccn.AbstractConsumer; import tccn.AbstractConsumer;
import tccn.Event; import tccn.Event;
import tccn.IConsumer; import tccn.IConsumer;
import tccn.IProducer; import tccn.IProducer;
import tccn.timer.Timers; import tccn.timer.Timers;
import javax.annotation.PostConstruct;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -27,15 +30,27 @@ import java.util.function.Function;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@Component
public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer { public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer {
public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName()); public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName());
@Value("${zhub.addr}")
private String addr = "127.0.0.1:1216"; private String addr = "127.0.0.1:1216";
//private String password = "";
@Value("${zhub.groupid}")
private String groupid = ""; private String groupid = "";
@Value("${zhub.auth}")
private String auth = ""; private String auth = "";
@Value("${zhub.appid}")
protected String appid = "";
@PostConstruct
public void init() {
init(null);
}
private OutputStream writer; private OutputStream writer;
private BufferedReader reader; private BufferedReader reader;
@ -58,7 +73,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
public ZHubClient(String addr, String groupid, String appid, String auth) { public ZHubClient(String addr, String groupid, String appid, String auth) {
this.addr = addr; this.addr = addr;
this.groupid = groupid; this.groupid = groupid;
this.APP_ID = appid; this.appid = appid;
this.auth = auth; this.auth = auth;
init(null); init(null);
} }
@ -72,7 +87,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
if (config != null) { if (config != null) {
addr = config.getOrDefault("addr", addr); addr = config.getOrDefault("addr", addr);
groupid = config.getOrDefault("groupid", groupid); groupid = config.getOrDefault("groupid", groupid);
APP_ID = config.getOrDefault("appname", APP_ID); appid = config.getOrDefault("appname", appid);
} }
// 设置第一个启动的 实例为主实例 // 设置第一个启动的 实例为主实例
@ -136,13 +151,26 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
Lock lock = lockTag.get(value); Lock lock = lockTag.get(value);
if (lock != null) { if (lock != null) {
synchronized (lock) { synchronized (lock) {
lock.success = true;
lock.notifyAll(); lock.notifyAll();
} }
} }
continue; continue;
} }
// trylock msg
if ("trylock".equals(topic)) {
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.success = false;
lock.notifyAll();
}
}
continue;
}
// rpc back msg // rpc back msg
if (APP_ID.equals(topic)) { if (appid.equals(topic)) {
rpcBackQueue.add(Event.of(topic, value)); rpcBackQueue.add(Event.of(topic, value));
continue; continue;
} }
@ -355,11 +383,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
send("auth", auth); send("auth", auth);
send("groupid " + groupid); send("groupid " + groupid);
StringBuffer buf = new StringBuffer("subscribe lock"); StringBuffer buf = new StringBuffer("subscribe lock trylock");
/*if (isMain) { /*if (isMain) {
}*/ }*/
if (mainHub.containsValue(this)) { if (mainHub.containsValue(this)) {
buf.append(" ").append(APP_ID); buf.append(" ").append(appid);
} }
for (String topic : getTopics()) { for (String topic : getTopics()) {
buf.append(" ").append(topic); buf.append(" ").append(topic);
@ -466,14 +494,35 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// ================================================== lock ================================================== // ================================================== lock ==================================================
private final Map<String, Lock> lockTag = new ConcurrentHashMap<>(); private final Map<String, Lock> lockTag = new ConcurrentHashMap<>();
/**
* 尝试加锁立即返回
*
* @param key
* @param duration
* @return Lock: lock.success 锁定是否成功标识
*/
public Lock tryLock(String key, int duration) { public Lock tryLock(String key, int duration) {
return lock("trylock", key, duration);
}
public Lock lock(String key, int duration) {
return lock("lock", key, duration);
}
/**
* @param cmd lock|trylock
* @param key 加锁 key
* @param duration 锁定时长
* @return
*/
private Lock lock(String cmd, String key, int duration) {
String uuid = UUID.randomUUID().toString().replaceAll("-", ""); String uuid = UUID.randomUUID().toString().replaceAll("-", "");
Lock lock = new Lock(key, uuid, duration, this); Lock lock = new Lock(key, uuid, duration, this);
lockTag.put(uuid, lock); lockTag.put(uuid, lock);
try { try {
// c.send("lock", key, uuid, strconv.Itoa(duration)) // c.send("lock", key, uuid, strconv.Itoa(duration))
send("lock", key, uuid, String.valueOf(duration)); send(cmd, key, uuid, String.valueOf(duration));
synchronized (lock) { synchronized (lock) {
lock.wait(); lock.wait();
} }
@ -533,7 +582,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// rpc call // rpc call
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) { public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) {
Rpc rpc = new Rpc<>(APP_ID, UUID.randomUUID().toString().replaceAll("-", ""), topic, v); Rpc rpc = new Rpc<>(appid, UUID.randomUUID().toString().replaceAll("-", ""), topic, v);
String ruk = rpc.getRuk(); String ruk = rpc.getRuk();
rpcMap.put(ruk, rpc); rpcMap.put(ruk, rpc);
if (typeToken != null) { if (typeToken != null) {

View File

@ -0,0 +1,7 @@
# zhub 配置
zhub:
appid: local_api
addr: 127.0.0.1:1216
groupid: hub-api
auth: token-12345

View File

@ -1,6 +1,7 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import tccn.IType; import tccn.IType;
import tccn.zhub.Lock;
import tccn.zhub.ZHubClient; import tccn.zhub.ZHubClient;
// @RestService(automapping = true) // @RestService(automapping = true)
@ -15,11 +16,15 @@ public class HelloService {
//zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "zchd@123456"); //zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "zchd@123456");
zhub = new ZHubClient("47.111.150.118:6066", "g-dev", "DEV-LOCAL", "zchd@123456"); zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "token-12345");
zhub.subscribe("tv:test", x -> { zhub.subscribe("tv:test", x -> {
System.out.println(x); System.out.println(x);
}); });
Lock lock = zhub.tryLock("lock-a", 5);
System.out.println("lock-1: " + lock.success());
//zhub.init(Kv.of("host", "47.111.150.118", "port", "6066", "groupid", "g-dev", "appname", "DEV-LOCAL")); //zhub.init(Kv.of("host", "47.111.150.118", "port", "6066", "groupid", "g-dev", "appname", "DEV-LOCAL"));
// Function<Rpc<T>, RpcResult<R>> fun // Function<Rpc<T>, RpcResult<R>> fun
@ -65,9 +70,28 @@ public class HelloService {
}); });
zhub.rpcSubscribe("rpc-x", IType.STRING, x -> { zhub.rpcSubscribe("rpc-x", IType.STRING, x -> {
return x.buildResp(x.getValue().toUpperCase()); return x.render(x.getValue().toUpperCase());
}); });
Lock lock = zhub.tryLock("lock-a", 5);
System.out.println("lock-2: " + lock.success());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Lock lock2 = zhub.tryLock("lock-a", 5);
System.out.println("lock-3: " + lock2.success());
/*try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
Lock lock3 = zhub.tryLock("lock-a", 5);
System.out.println("lock-4: " + lock3.success());
try { try {
Thread.sleep(3000 * 30000); Thread.sleep(3000 * 30000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -75,6 +99,10 @@ public class HelloService {
} }
} }
public void lockTest() {
}
/*RpcResult<FileToken> x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() { /*RpcResult<FileToken> x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() {
});*/ });*/
} }