This commit is contained in:
@@ -32,9 +32,6 @@ public abstract class WebSocketNode {
|
|||||||
@Comment("存储用户ID的key前缀")
|
@Comment("存储用户ID的key前缀")
|
||||||
public static final String SOURCE_SNCP_USERID_PREFIX = "sncpws_uid:";
|
public static final String SOURCE_SNCP_USERID_PREFIX = "sncpws_uid:";
|
||||||
|
|
||||||
@Comment("存储用户数的key")
|
|
||||||
public static final String SOURCE_SNCP_USERCOUNT_KEY = "sncpws_usercount";
|
|
||||||
|
|
||||||
@Comment("存储当前SNCP节点列表的key")
|
@Comment("存储当前SNCP节点列表的key")
|
||||||
public static final String SOURCE_SNCP_ADDRS_KEY = "sncpws_addrs";
|
public static final String SOURCE_SNCP_ADDRS_KEY = "sncpws_addrs";
|
||||||
|
|
||||||
@@ -211,7 +208,7 @@ public abstract class WebSocketNode {
|
|||||||
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
|
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
|
||||||
}
|
}
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Integer> rs = this.sncpNodeAddresses.getLongAsync(SOURCE_SNCP_USERCOUNT_KEY, 0L).thenApply(v -> v.intValue());
|
CompletableFuture<Integer> rs = this.sncpNodeAddresses.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> v.size());
|
||||||
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
|
||||||
return rs;
|
return rs;
|
||||||
}
|
}
|
||||||
@@ -574,7 +571,7 @@ public abstract class WebSocketNode {
|
|||||||
protected boolean tryAcquireSemaphore() {
|
protected boolean tryAcquireSemaphore() {
|
||||||
if (this.semaphore == null) return true;
|
if (this.semaphore == null) return true;
|
||||||
try {
|
try {
|
||||||
System.out.println("---------this.semaphore.tryAcquire" );
|
System.out.println("---------this.semaphore.tryAcquire");
|
||||||
return this.semaphore.tryAcquire(6, TimeUnit.SECONDS);
|
return this.semaphore.tryAcquire(6, TimeUnit.SECONDS);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return false;
|
return false;
|
||||||
@@ -583,6 +580,6 @@ public abstract class WebSocketNode {
|
|||||||
|
|
||||||
protected void releaseSemaphore() {
|
protected void releaseSemaphore() {
|
||||||
if (this.semaphore != null) this.semaphore.release();
|
if (this.semaphore != null) this.semaphore.release();
|
||||||
System.out.println("---------this.semaphore.release: " + this.semaphore );
|
System.out.println("---------this.semaphore.release: " + this.semaphore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -79,7 +79,6 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
|
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
|
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
|
||||||
future = future.thenAccept((a) -> sncpNodeAddresses.incr(SOURCE_SNCP_USERCOUNT_KEY));
|
|
||||||
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr));
|
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr));
|
||||||
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
|
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
|
||||||
@@ -97,9 +96,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) {
|
public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) {
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Void> future = CompletableFuture.allOf(
|
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
|
||||||
sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr),
|
|
||||||
sncpNodeAddresses.decrAsync(SOURCE_SNCP_USERCOUNT_KEY));
|
|
||||||
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
|
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
|
||||||
return future;
|
return future;
|
||||||
|
|||||||
Reference in New Issue
Block a user