优化CacheSource
This commit is contained in:
@@ -5,20 +5,20 @@
|
|||||||
*/
|
*/
|
||||||
package org.redkale.net.http;
|
package org.redkale.net.http;
|
||||||
|
|
||||||
import org.redkale.net.http.WebSocketPacket.FrameType;
|
import java.io.Serializable;
|
||||||
import java.io.*;
|
|
||||||
import java.net.*;
|
import java.net.*;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.*;
|
import java.util.function.*;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.zip.*;
|
import java.util.zip.*;
|
||||||
|
import org.redkale.annotation.Comment;
|
||||||
import org.redkale.convert.Convert;
|
import org.redkale.convert.Convert;
|
||||||
import org.redkale.net.AsyncConnection;
|
import org.redkale.net.AsyncConnection;
|
||||||
import org.redkale.annotation.Comment;
|
import org.redkale.net.http.WebSocketPacket.FrameType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <blockquote><pre>
|
* <blockquote><pre>
|
||||||
@@ -507,7 +507,7 @@ public abstract class WebSocket<G extends Serializable, T> {
|
|||||||
*
|
*
|
||||||
* @return 地址列表
|
* @return 地址列表
|
||||||
*/
|
*/
|
||||||
public CompletableFuture<Collection<WebSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
|
public CompletableFuture<Set<WebSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
|
||||||
if (_engine.node == null) return CompletableFuture.completedFuture(null);
|
if (_engine.node == null) return CompletableFuture.completedFuture(null);
|
||||||
return _engine.node.getRpcNodeAddresses(userid);
|
return _engine.node.getRpcNodeAddresses(userid);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ public abstract class WebSocketNode {
|
|||||||
this.wsNodeAddress = new WebSocketAddress(mqtopic, new InetSocketAddress("127.0.0.1", 27));
|
this.wsNodeAddress = new WebSocketAddress(mqtopic, new InetSocketAddress("127.0.0.1", 27));
|
||||||
}
|
}
|
||||||
if (source != null && wsNodeAddress != null) {
|
if (source != null && wsNodeAddress != null) {
|
||||||
source.appendSetItem(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
|
source.sadd(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -109,7 +109,7 @@ public abstract class WebSocketNode {
|
|||||||
//关掉所有本地本地WebSocket
|
//关掉所有本地本地WebSocket
|
||||||
this.localEngine.getLocalWebSockets().forEach(g -> g.close());
|
this.localEngine.getLocalWebSockets().forEach(g -> g.close());
|
||||||
if (source != null && wsNodeAddress != null) {
|
if (source != null && wsNodeAddress != null) {
|
||||||
source.removeSetItem(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
|
source.srem(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,14 +184,14 @@ public abstract class WebSocketNode {
|
|||||||
*
|
*
|
||||||
* @return 地址列表
|
* @return 地址列表
|
||||||
*/
|
*/
|
||||||
public CompletableFuture<Collection<WebSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
|
public CompletableFuture<Set<WebSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
|
||||||
if (this.source != null) {
|
if (this.source != null) {
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Collection<WebSocketAddress>> result = this.source.getCollectionAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
CompletableFuture<Set<WebSocketAddress>> result = this.source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
||||||
if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
List<WebSocketAddress> rs = new ArrayList<>();
|
Set<WebSocketAddress> rs = new LinkedHashSet<>();
|
||||||
rs.add(this.wsNodeAddress);
|
rs.add(this.wsNodeAddress);
|
||||||
return CompletableFuture.completedFuture(rs);
|
return CompletableFuture.completedFuture(rs);
|
||||||
}
|
}
|
||||||
@@ -206,7 +206,7 @@ public abstract class WebSocketNode {
|
|||||||
* @return 地址集合
|
* @return 地址集合
|
||||||
*/
|
*/
|
||||||
public CompletableFuture<Map<WebSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
|
public CompletableFuture<Map<WebSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
|
||||||
CompletableFuture<Collection<WebSocketAddress>> sncpFuture = getRpcNodeAddresses(userid);
|
CompletableFuture<Set<WebSocketAddress>> sncpFuture = getRpcNodeAddresses(userid);
|
||||||
return sncpFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
|
return sncpFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
|
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
|
||||||
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>());
|
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>());
|
||||||
@@ -243,9 +243,9 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
|
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(WS_SOURCE_KEY_NODES, WebSocketAddress.class);
|
CompletableFuture<Set<WebSocketAddress>> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_NODES, WebSocketAddress.class);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
|
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose(addrs -> {
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket getUserSize on " + addrs);
|
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket getUserSize on " + addrs);
|
||||||
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
||||||
CompletableFuture<Integer> future = null;
|
CompletableFuture<Integer> future = null;
|
||||||
@@ -295,9 +295,9 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
//远程节点关闭
|
//远程节点关闭
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
CompletableFuture<Set<WebSocketAddress>> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
CompletableFuture<Boolean> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
|
CompletableFuture<Boolean> remoteFuture = addrsFuture.thenCompose(addrs -> {
|
||||||
//if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
|
//if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
|
||||||
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false);
|
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false);
|
||||||
CompletableFuture<Boolean> future = null;
|
CompletableFuture<Boolean> future = null;
|
||||||
@@ -378,7 +378,7 @@ public abstract class WebSocketNode {
|
|||||||
CompletableFuture<Collection<WebSocketAddress>> addrsFuture;
|
CompletableFuture<Collection<WebSocketAddress>> addrsFuture;
|
||||||
if (userAddress == null) {
|
if (userAddress == null) {
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
addrsFuture = source.getCollectionAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
addrsFuture = (CompletableFuture) source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
} else {
|
} else {
|
||||||
Collection<WebSocketAddress> addrs = userAddress.addresses();
|
Collection<WebSocketAddress> addrs = userAddress.addresses();
|
||||||
@@ -563,9 +563,9 @@ public abstract class WebSocketNode {
|
|||||||
keyuser.put(keys[i], userids[i]);
|
keyuser.put(keys[i], userids[i]);
|
||||||
}
|
}
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Map<String, Collection<WebSocketAddress>>> addrsFuture = source.getCollectionMapAsync(true, WebSocketAddress.class, keys);
|
CompletableFuture<Map<String, Set<WebSocketAddress>>> addrsFuture = source.smembersAsync(WebSocketAddress.class, keys);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
rsfuture = addrsFuture.thenCompose((Map<String, Collection<WebSocketAddress>> addrs) -> {
|
rsfuture = addrsFuture.thenCompose(addrs -> {
|
||||||
if (addrs == null || addrs.isEmpty()) {
|
if (addrs == null || addrs.isEmpty()) {
|
||||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node ");
|
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node ");
|
||||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||||
@@ -641,9 +641,9 @@ public abstract class WebSocketNode {
|
|||||||
//远程节点发送消息
|
//远程节点发送消息
|
||||||
final Object remoteMessage = formatRemoteMessage(message);
|
final Object remoteMessage = formatRemoteMessage(message);
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
CompletableFuture<Set<WebSocketAddress>> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
|
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose(addrs -> {
|
||||||
if (addrs == null || addrs.isEmpty()) {
|
if (addrs == null || addrs.isEmpty()) {
|
||||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
|
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
|
||||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||||
@@ -816,9 +816,9 @@ public abstract class WebSocketNode {
|
|||||||
final Object remoteMessage = formatRemoteMessage(message);
|
final Object remoteMessage = formatRemoteMessage(message);
|
||||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last);
|
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last);
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(WS_SOURCE_KEY_NODES, WebSocketAddress.class);
|
CompletableFuture<Set<WebSocketAddress>> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_NODES, WebSocketAddress.class);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
|
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose(addrs -> {
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs);
|
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs);
|
||||||
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
||||||
CompletableFuture<Integer> future = null;
|
CompletableFuture<Integer> future = null;
|
||||||
@@ -846,9 +846,9 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalAction(action);
|
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalAction(action);
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(WS_SOURCE_KEY_NODES, WebSocketAddress.class);
|
CompletableFuture<Set<WebSocketAddress>> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_NODES, WebSocketAddress.class);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
|
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose(addrs -> {
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs);
|
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs);
|
||||||
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
||||||
CompletableFuture<Integer> future = null;
|
CompletableFuture<Integer> future = null;
|
||||||
@@ -895,9 +895,9 @@ public abstract class WebSocketNode {
|
|||||||
keyuser.put(keys[i], userids[i]);
|
keyuser.put(keys[i], userids[i]);
|
||||||
}
|
}
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Map<String, Collection<WebSocketAddress>>> addrsFuture = source.getCollectionMapAsync(true, WebSocketAddress.class, keys);
|
CompletableFuture<Map<String, Set<WebSocketAddress>>> addrsFuture = source.smembersAsync(WebSocketAddress.class, keys);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
rsfuture = addrsFuture.thenCompose((Map<String, Collection<WebSocketAddress>> addrs) -> {
|
rsfuture = addrsFuture.thenCompose(addrs -> {
|
||||||
if (addrs == null || addrs.isEmpty()) {
|
if (addrs == null || addrs.isEmpty()) {
|
||||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node ");
|
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node ");
|
||||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||||
@@ -965,9 +965,9 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
//远程节点发送操作
|
//远程节点发送操作
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
CompletableFuture<Set<WebSocketAddress>> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
|
||||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
|
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose(addrs -> {
|
||||||
if (addrs == null || addrs.isEmpty()) {
|
if (addrs == null || addrs.isEmpty()) {
|
||||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
|
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
|
||||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> connect(Serializable userid, WebSocketAddress wsaddr) {
|
public CompletableFuture<Void> connect(Serializable userid, WebSocketAddress wsaddr) {
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Void> future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
|
CompletableFuture<Void> future = source.saddAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
|
||||||
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + wsaddr);
|
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + wsaddr);
|
||||||
return future;
|
return future;
|
||||||
@@ -105,7 +105,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> disconnect(Serializable userid, WebSocketAddress wsaddr) {
|
public CompletableFuture<Void> disconnect(Serializable userid, WebSocketAddress wsaddr) {
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Integer> future = source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
|
CompletableFuture<Integer> future = source.sremAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
|
||||||
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + wsaddr);
|
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + wsaddr);
|
||||||
return future.thenApply(v -> null);
|
return future.thenApply(v -> null);
|
||||||
@@ -123,8 +123,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, WebSocketAddress wsaddr) {
|
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, WebSocketAddress wsaddr) {
|
||||||
tryAcquireSemaphore();
|
tryAcquireSemaphore();
|
||||||
CompletableFuture<Void> future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + newuserid, WebSocketAddress.class, wsaddr);
|
CompletableFuture<Void> future = source.saddAsync(WS_SOURCE_KEY_USERID_PREFIX + newuserid, WebSocketAddress.class, wsaddr);
|
||||||
future = future.thenAccept((a) -> source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + olduserid, WebSocketAddress.class, wsaddr));
|
future = future.thenAccept((a) -> source.sremAsync(WS_SOURCE_KEY_USERID_PREFIX + olduserid, WebSocketAddress.class, wsaddr));
|
||||||
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + wsaddr);
|
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + wsaddr);
|
||||||
return future;
|
return future;
|
||||||
|
|||||||
@@ -918,6 +918,21 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
return (Collection<T>) get(key, componentType);
|
return (Collection<T>) get(key, componentType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Set<T> smembers(final String key, final Type componentType) {
|
||||||
|
return (Set<T>) get(key, componentType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Map<String, Set<T>> smembers(final Type componentType, final String... keys) {
|
||||||
|
Map<String, Set<T>> map = new HashMap<>();
|
||||||
|
for (String key : keys) {
|
||||||
|
Set<T> s = (Set<T>) get(key, componentType);
|
||||||
|
if (s != null) map.put(key, s);
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> Map<String, Collection<T>> getCollectionMap(final boolean set, final Type componentType, final String... keys) {
|
public <T> Map<String, Collection<T>> getCollectionMap(final boolean set, final Type componentType, final String... keys) {
|
||||||
Map<String, Collection<T>> map = new HashMap<>();
|
Map<String, Collection<T>> map = new HashMap<>();
|
||||||
@@ -1034,6 +1049,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> CompletableFuture<Map<String, Set<T>>> smembersAsync(Type componentType, String... keys) {
|
||||||
|
return CompletableFuture.supplyAsync(() -> smembers(componentType, keys), getExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> CompletableFuture<Map<String, Collection<T>>> getCollectionMapAsync(boolean set, Type componentType, String... keys) {
|
public <T> CompletableFuture<Map<String, Collection<T>>> getCollectionMapAsync(boolean set, Type componentType, String... keys) {
|
||||||
return CompletableFuture.supplyAsync(() -> getCollectionMap(set, componentType, keys), getExecutor());
|
return CompletableFuture.supplyAsync(() -> getCollectionMap(set, componentType, keys), getExecutor());
|
||||||
@@ -1064,6 +1084,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
return CompletableFuture.supplyAsync(() -> getCollection(key, componentType), getExecutor());
|
return CompletableFuture.supplyAsync(() -> getCollection(key, componentType), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> CompletableFuture<Set<T>> smembersAsync(String key, Type componentType) {
|
||||||
|
return CompletableFuture.supplyAsync(() -> smembers(key, componentType), getExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getCollectionSize(final String key) {
|
public int getCollectionSize(final String key) {
|
||||||
Collection collection = (Collection) get(key, Object.class);
|
Collection collection = (Collection) get(key, Object.class);
|
||||||
@@ -1086,14 +1111,14 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> boolean existsSetItem(final String key, final Type type, final T value) {
|
public <T> boolean sismember(final String key, final Type type, final T value) {
|
||||||
Collection list = getCollection(key, type);
|
Collection list = getCollection(key, type);
|
||||||
return list != null && list.contains(value);
|
return list != null && list.contains(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> CompletableFuture<Boolean> existsSetItemAsync(final String key, final Type type, final T value) {
|
public <T> CompletableFuture<Boolean> sismemberAsync(final String key, final Type type, final T value) {
|
||||||
return CompletableFuture.supplyAsync(() -> existsSetItem(key, type, value), getExecutor());
|
return CompletableFuture.supplyAsync(() -> sismember(key, type, value), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -1223,26 +1248,26 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String spopStringSetItem(final String key) {
|
public String spopStringSetItem(final String key) {
|
||||||
return (String) spopSetItem(key, String.class);
|
return (String) spop(key, String.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> spopStringSetItem(final String key, int count) {
|
public Set<String> spopStringSetItem(final String key, int count) {
|
||||||
return spopSetItem(key, count, String.class);
|
return spop(key, count, String.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long spopLongSetItem(final String key) {
|
public Long spopLongSetItem(final String key) {
|
||||||
return (Long) spopSetItem(key, long.class);
|
return (Long) spop(key, long.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<Long> spopLongSetItem(final String key, int count) {
|
public Set<Long> spopLongSetItem(final String key, int count) {
|
||||||
return spopSetItem(key, count, long.class);
|
return spop(key, count, long.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> T spopSetItem(final String key, final Type componentType) {
|
public <T> T spop(final String key, final Type componentType) {
|
||||||
if (key == null) return null;
|
if (key == null) return null;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
|
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
|
||||||
@@ -1260,7 +1285,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> Set<T> spopSetItem(final String key, final int count, final Type componentType) {
|
public <T> Set<T> spop(final String key, final int count, final Type componentType) {
|
||||||
if (key == null) return new LinkedHashSet<>();
|
if (key == null) return new LinkedHashSet<>();
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
|
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
|
||||||
@@ -1295,7 +1320,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void appendSetItem(String key, final Type componentType, T value) {
|
public <T> void sadd(String key, final Type componentType, T value) {
|
||||||
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
|
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1310,8 +1335,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> CompletableFuture<Void> appendSetItemAsync(final String key, final Type componentType, T value) {
|
public <T> CompletableFuture<Void> saddAsync(final String key, final Type componentType, T value) {
|
||||||
return CompletableFuture.runAsync(() -> appendSetItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
return CompletableFuture.runAsync(() -> sadd(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -1325,7 +1350,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> int removeSetItem(String key, Type type, T value) {
|
public <T> int srem(String key, Type type, T value) {
|
||||||
if (key == null) return 0;
|
if (key == null) return 0;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null || entry.csetValue == null) return 0;
|
if (entry == null || entry.csetValue == null) return 0;
|
||||||
@@ -1349,8 +1374,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> CompletableFuture<Integer> removeSetItemAsync(final String key, final Type componentType, final T value) {
|
public <T> CompletableFuture<Integer> sremAsync(final String key, final Type componentType, final T value) {
|
||||||
return CompletableFuture.supplyAsync(() -> removeSetItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
return CompletableFuture.supplyAsync(() -> srem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -1490,13 +1515,13 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> CompletableFuture<T> spopSetItemAsync(String key, Type componentType) {
|
public <T> CompletableFuture<T> spopAsync(String key, Type componentType) {
|
||||||
return CompletableFuture.supplyAsync(() -> spopSetItem(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer);
|
return CompletableFuture.supplyAsync(() -> spop(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> CompletableFuture<Set<T>> spopSetItemAsync(String key, int count, Type componentType) {
|
public <T> CompletableFuture<Set<T>> spopAsync(String key, int count, Type componentType) {
|
||||||
return CompletableFuture.supplyAsync(() -> spopSetItem(key, count, componentType), getExecutor()).whenComplete(futureCompleteConsumer);
|
return CompletableFuture.supplyAsync(() -> spop(key, count, componentType), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -188,16 +188,21 @@ public interface CacheSource extends Resourcable {
|
|||||||
public int removeLongListItem(final String key, final long value);
|
public int removeLongListItem(final String key, final long value);
|
||||||
|
|
||||||
//------------------------ set ------------------------
|
//------------------------ set ------------------------
|
||||||
public <T> boolean existsSetItem(final String key, final Type componentType, final T value);
|
public <T> Set<T> smembers(final String key, final Type componentType);
|
||||||
|
|
||||||
public <T> void appendSetItem(final String key, final Type componentType, final T value);
|
public <T> Map<String, Set<T>> smembers(final Type componentType, final String... keys);
|
||||||
|
|
||||||
public <T> int removeSetItem(final String key, final Type componentType, final T value);
|
public <T> boolean sismember(final String key, final Type componentType, final T value);
|
||||||
|
|
||||||
public <T> T spopSetItem(final String key, final Type componentType);
|
public <T> void sadd(final String key, final Type componentType, final T value);
|
||||||
|
|
||||||
public <T> Set<T> spopSetItem(final String key, final int count, final Type componentType);
|
public <T> int srem(final String key, final Type componentType, final T value);
|
||||||
|
|
||||||
|
public <T> T spop(final String key, final Type componentType);
|
||||||
|
|
||||||
|
public <T> Set<T> spop(final String key, final int count, final Type componentType);
|
||||||
|
|
||||||
|
//---------- set-string ----------
|
||||||
public boolean existsStringSetItem(final String key, final String value);
|
public boolean existsStringSetItem(final String key, final String value);
|
||||||
|
|
||||||
public void appendStringSetItem(final String key, final String value);
|
public void appendStringSetItem(final String key, final String value);
|
||||||
@@ -208,6 +213,7 @@ public interface CacheSource extends Resourcable {
|
|||||||
|
|
||||||
public Set<String> spopStringSetItem(final String key, final int count);
|
public Set<String> spopStringSetItem(final String key, final int count);
|
||||||
|
|
||||||
|
//---------- set-long ----------
|
||||||
public boolean existsLongSetItem(final String key, final long value);
|
public boolean existsLongSetItem(final String key, final long value);
|
||||||
|
|
||||||
public void appendLongSetItem(final String key, final long value);
|
public void appendLongSetItem(final String key, final long value);
|
||||||
@@ -427,16 +433,21 @@ public interface CacheSource extends Resourcable {
|
|||||||
public CompletableFuture<Integer> removeLongListItemAsync(final String key, final long value);
|
public CompletableFuture<Integer> removeLongListItemAsync(final String key, final long value);
|
||||||
|
|
||||||
//------------------------ setAsync ------------------------
|
//------------------------ setAsync ------------------------
|
||||||
public <T> CompletableFuture<Boolean> existsSetItemAsync(final String key, final Type componentType, final T value);
|
public <T> CompletableFuture<Set<T>> smembersAsync(final String key, final Type componentType);
|
||||||
|
|
||||||
public <T> CompletableFuture<Void> appendSetItemAsync(final String key, final Type componentType, final T value);
|
public <T> CompletableFuture<Map<String, Set<T>>> smembersAsync(final Type componentType, final String... keys);
|
||||||
|
|
||||||
public <T> CompletableFuture<Integer> removeSetItemAsync(final String key, final Type componentType, final T value);
|
public <T> CompletableFuture<Boolean> sismemberAsync(final String key, final Type componentType, final T value);
|
||||||
|
|
||||||
public <T> CompletableFuture<T> spopSetItemAsync(final String key, final Type componentType);
|
public <T> CompletableFuture<Void> saddAsync(final String key, final Type componentType, final T value);
|
||||||
|
|
||||||
public <T> CompletableFuture<Set<T>> spopSetItemAsync(final String key, final int count, final Type componentType);
|
public <T> CompletableFuture<Integer> sremAsync(final String key, final Type componentType, final T value);
|
||||||
|
|
||||||
|
public <T> CompletableFuture<T> spopAsync(final String key, final Type componentType);
|
||||||
|
|
||||||
|
public <T> CompletableFuture<Set<T>> spopAsync(final String key, final int count, final Type componentType);
|
||||||
|
|
||||||
|
//---------- set-string ----------
|
||||||
public CompletableFuture<Boolean> existsStringSetItemAsync(final String key, final String value);
|
public CompletableFuture<Boolean> existsStringSetItemAsync(final String key, final String value);
|
||||||
|
|
||||||
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value);
|
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value);
|
||||||
@@ -447,6 +458,7 @@ public interface CacheSource extends Resourcable {
|
|||||||
|
|
||||||
public CompletableFuture<Set<String>> spopStringSetItemAsync(final String key, final int count);
|
public CompletableFuture<Set<String>> spopStringSetItemAsync(final String key, final int count);
|
||||||
|
|
||||||
|
//---------- set-long ----------
|
||||||
public CompletableFuture<Boolean> existsLongSetItemAsync(final String key, final long value);
|
public CompletableFuture<Boolean> existsLongSetItemAsync(final String key, final long value);
|
||||||
|
|
||||||
public CompletableFuture<Void> appendLongSetItemAsync(final String key, final long value);
|
public CompletableFuture<Void> appendLongSetItemAsync(final String key, final long value);
|
||||||
@@ -642,4 +654,54 @@ public interface CacheSource extends Resourcable {
|
|||||||
default CompletableFuture<Integer> hsizeAsync(final String key) {
|
default CompletableFuture<Integer> hsizeAsync(final String key) {
|
||||||
return hlenAsync(key);
|
return hlenAsync(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> void appendSetItem(final String key, final Type componentType, final T value) {
|
||||||
|
sadd(key, componentType, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> int removeSetItem(final String key, final Type componentType, final T value) {
|
||||||
|
return srem(key, componentType, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> T spopSetItem(final String key, final Type componentType) {
|
||||||
|
return CacheSource.this.spop(key, componentType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> Set<T> spopSetItem(final String key, final int count, final Type componentType) {
|
||||||
|
return spop(key, count, componentType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> boolean existsSetItem(final String key, final Type componentType, final T value) {
|
||||||
|
return sismember(key, componentType, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> CompletableFuture<Boolean> existsSetItemAsync(final String key, final Type componentType, final T value) {
|
||||||
|
return sismemberAsync(key, componentType, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> CompletableFuture<Void> appendSetItemAsync(final String key, final Type componentType, final T value) {
|
||||||
|
return saddAsync(key, componentType, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> CompletableFuture<Integer> removeSetItemAsync(final String key, final Type componentType, final T value) {
|
||||||
|
return sremAsync(key, componentType, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> CompletableFuture<T> spopSetItemAsync(final String key, final Type componentType) {
|
||||||
|
return spopAsync(key, componentType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
default <T> CompletableFuture<Set<T>> spopSetItemAsync(final String key, final int count, final Type componentType) {
|
||||||
|
return spopAsync(key, count, componentType);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user