新增:trylock 尝试获取锁,并立即返回加锁结果

This commit is contained in:
绝尘 2023-10-21 13:00:55 +08:00
parent 8e2779f2d8
commit f4771aadf2
3 changed files with 75 additions and 8 deletions

View File

@ -2,10 +2,11 @@ package tccn.zhub;
// ================================================== lock ==================================================
public class Lock {
private String name;
private String uuid;
private int duration;
private ZHubClient hubClient;
protected String name;
protected String uuid;
protected int duration;
protected boolean success;
protected ZHubClient hubClient;
protected Lock(String name, String uuid, int duration, ZHubClient hubClient) {
this.name = name;
@ -17,4 +18,8 @@ public class Lock {
public void unLock() {
hubClient.send("unlock", name, uuid);
}
public boolean success() {
return success;
}
}

View File

@ -151,11 +151,24 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.success = true;
lock.notifyAll();
}
}
continue;
}
// trylock msg
if ("trylock".equals(topic)) {
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.success = false;
lock.notifyAll();
}
}
continue;
}
// rpc back msg
if (appid.equals(topic)) {
rpcBackQueue.add(Event.of(topic, value));
@ -370,7 +383,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
send("auth", auth);
send("groupid " + groupid);
StringBuffer buf = new StringBuffer("subscribe lock");
StringBuffer buf = new StringBuffer("subscribe lock trylock");
/*if (isMain) {
}*/
if (mainHub.containsValue(this)) {
@ -481,14 +494,35 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// ================================================== lock ==================================================
private final Map<String, Lock> lockTag = new ConcurrentHashMap<>();
/**
* 尝试加锁立即返回
*
* @param key
* @param duration
* @return Lock: lock.success 锁定是否成功标识
*/
public Lock tryLock(String key, int duration) {
return lock("trylock", key, duration);
}
public Lock lock(String key, int duration) {
return lock("lock", key, duration);
}
/**
* @param cmd lock|trylock
* @param key 加锁 key
* @param duration 锁定时长
* @return
*/
private Lock lock(String cmd, String key, int duration) {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
Lock lock = new Lock(key, uuid, duration, this);
lockTag.put(uuid, lock);
try {
// c.send("lock", key, uuid, strconv.Itoa(duration))
send("lock", key, uuid, String.valueOf(duration));
send(cmd, key, uuid, String.valueOf(duration));
synchronized (lock) {
lock.wait();
}

View File

@ -1,6 +1,7 @@
import org.junit.Before;
import org.junit.Test;
import tccn.IType;
import tccn.zhub.Lock;
import tccn.zhub.ZHubClient;
// @RestService(automapping = true)
@ -15,11 +16,15 @@ public class HelloService {
//zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "zchd@123456");
zhub = new ZHubClient("47.111.150.118:6066", "g-dev", "DEV-LOCAL", "zchd@123456");
zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "token-12345");
zhub.subscribe("tv:test", x -> {
System.out.println(x);
});
Lock lock = zhub.tryLock("lock-a", 5);
System.out.println("lock-1: " + lock.success());
//zhub.init(Kv.of("host", "47.111.150.118", "port", "6066", "groupid", "g-dev", "appname", "DEV-LOCAL"));
// Function<Rpc<T>, RpcResult<R>> fun
@ -65,9 +70,28 @@ public class HelloService {
});
zhub.rpcSubscribe("rpc-x", IType.STRING, x -> {
return x.buildResp(x.getValue().toUpperCase());
return x.render(x.getValue().toUpperCase());
});
Lock lock = zhub.tryLock("lock-a", 5);
System.out.println("lock-2: " + lock.success());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Lock lock2 = zhub.tryLock("lock-a", 5);
System.out.println("lock-3: " + lock2.success());
/*try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
Lock lock3 = zhub.tryLock("lock-a", 5);
System.out.println("lock-4: " + lock3.success());
try {
Thread.sleep(3000 * 30000);
} catch (InterruptedException e) {
@ -75,6 +99,10 @@ public class HelloService {
}
}
public void lockTest() {
}
/*RpcResult<FileToken> x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() {
});*/
}