增加判断用户是否在线和获取在线用户总数接口

This commit is contained in:
Redkale
2017-07-05 15:57:22 +08:00
parent f4abfafea2
commit 12fa033e15
3 changed files with 73 additions and 17 deletions

View File

@@ -24,18 +24,18 @@ import org.redkale.util.*;
* *
* @author zhangjx * @author zhangjx
*/ */
public final class WebSocketEngine { public class WebSocketEngine {
//全局自增长ID @Comment("全局自增长ID, 为了确保在一个进程里多个WebSocketEngine定时发送ping时不会同时进行")
private static final AtomicInteger sequence = new AtomicInteger(); private static final AtomicInteger sequence = new AtomicInteger();
//Engine自增长序号ID @Comment("Engine自增长序号ID")
private final int index; private final int index;
//当前WebSocket对应的Engine @Comment("当前WebSocket对应的Engine")
private final String engineid; private final String engineid;
//当前WebSocket对应的Node @Comment("当前WebSocket对应的Node")
protected final WebSocketNode node; protected final WebSocketNode node;
//HttpContext //HttpContext
@@ -44,23 +44,25 @@ public final class WebSocketEngine {
//Convert //Convert
protected final Convert sendConvert; protected final Convert sendConvert;
protected final boolean single; //是否单用户单连接 @Comment("是否单用户单连接")
protected final boolean single;
//在线用户ID对应的WebSocket组用于单用户单连接模式 @Comment("在线用户ID对应的WebSocket组用于单用户单连接模式")
private final Map<Serializable, WebSocket> websockets = new ConcurrentHashMap<>(); private final Map<Serializable, WebSocket> websockets = new ConcurrentHashMap<>();
//在线用户ID对应的WebSocket组用于单用户多连接模式 @Comment("在线用户ID对应的WebSocket组用于单用户多连接模式")
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>(); private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
//用于PING的定时器 @Comment("用于PING的定时器")
private ScheduledThreadPoolExecutor scheduler; private ScheduledThreadPoolExecutor scheduler;
//日志 @Comment("日志")
protected final Logger logger; protected final Logger logger;
//FINEST日志级别 @Comment("日志级别")
protected final boolean finest; protected final boolean finest;
@Comment("PING的间隔秒数")
private int liveinterval; private int liveinterval;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) { protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) {
@@ -71,8 +73,8 @@ public final class WebSocketEngine {
this.node = node; this.node = node;
this.liveinterval = liveinterval; this.liveinterval = liveinterval;
this.logger = logger; this.logger = logger;
this.index = sequence.getAndIncrement();
this.finest = logger.isLoggable(Level.FINEST); this.finest = logger.isLoggable(Level.FINEST);
this.index = sequence.getAndIncrement();
} }
void init(AnyValue conf) { void init(AnyValue conf) {
@@ -97,6 +99,7 @@ public final class WebSocketEngine {
if (scheduler != null) scheduler.shutdownNow(); if (scheduler != null) scheduler.shutdownNow();
} }
@Comment("添加WebSocket")
void add(WebSocket socket) { void add(WebSocket socket) {
if (single) { if (single) {
websockets.put(socket._userid, socket); websockets.put(socket._userid, socket);
@@ -111,6 +114,7 @@ public final class WebSocketEngine {
if (node != null) node.connect(socket._userid); if (node != null) node.connect(socket._userid);
} }
@Comment("从WebSocketEngine删除指定WebSocket")
void remove(WebSocket socket) { void remove(WebSocket socket) {
Serializable userid = socket._userid; Serializable userid = socket._userid;
if (single) { if (single) {
@@ -128,10 +132,12 @@ public final class WebSocketEngine {
} }
} }
@Comment("给所有连接用户发送消息")
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) { public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
return broadcastMessage(null, message, last); return broadcastMessage(null, message, last);
} }
@Comment("给指定WebSocket连接用户发送消息")
public CompletableFuture<Integer> broadcastMessage(final Predicate<WebSocket> predicate, final Object message, final boolean last) { public CompletableFuture<Integer> broadcastMessage(final Predicate<WebSocket> predicate, final Object message, final boolean last) {
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(predicate, json, last)); return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(predicate, json, last));
@@ -177,6 +183,7 @@ public final class WebSocketEngine {
} }
} }
@Comment("给指定用户组发送消息")
public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) { public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids)); return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids));
@@ -226,21 +233,33 @@ public final class WebSocketEngine {
} }
} }
Collection<WebSocket> getLocalWebSockets() { @Comment("获取所有连接")
public Collection<WebSocket> getLocalWebSockets() {
if (single) return websockets.values(); if (single) return websockets.values();
List<WebSocket> list = new ArrayList<>(); List<WebSocket> list = new ArrayList<>();
websockets2.values().forEach(x -> list.addAll(x)); websockets2.values().forEach(x -> list.addAll(x));
return list; return list;
} }
//适用于单用户单连接模式 @Comment("获取当前连接总数")
public int getLocalWebSocketSize() {
if (single) return websockets.size();
return (int) websockets2.values().stream().mapToInt(sublist -> sublist.size()).count();
}
@Comment("获取当前用户总数")
public int getLocalUserSize() {
return single ? websockets.size() : websockets2.size();
}
@Comment("适用于单用户单连接模式")
public WebSocket findLocalWebSocket(Serializable userid) { public WebSocket findLocalWebSocket(Serializable userid) {
if (single) return websockets.get(userid); if (single) return websockets.get(userid);
List<WebSocket> list = websockets2.get(userid); List<WebSocket> list = websockets2.get(userid);
return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1); return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1);
} }
//适用于单用户多连接模式 @Comment("适用于单用户多连接模式")
public Stream<WebSocket> getLocalWebSockets(Serializable userid) { public Stream<WebSocket> getLocalWebSockets(Serializable userid) {
if (single) { if (single) {
WebSocket websocket = websockets.get(userid); WebSocket websocket = websockets.get(userid);

View File

@@ -26,6 +26,12 @@ import org.redkale.util.*;
*/ */
public abstract class WebSocketNode { public abstract class WebSocketNode {
@Comment("存储当前SNCP节点列表的key")
public static final String SOURCE_SNCP_NODES_KEY = "redkale_sncpnodes";
@Comment("存储当前用户数量的key")
public static final String SOURCE_USER_COUNT_KEY = "redkale_usercount";
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final boolean finest = logger.isLoggable(Level.FINEST); protected final boolean finest = logger.isLoggable(Level.FINEST);
@@ -57,7 +63,9 @@ public abstract class WebSocketNode {
if (this.localEngine == null) return; if (this.localEngine == null) return;
//关掉所有本地本地WebSocket //关掉所有本地本地WebSocket
this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid())); this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()));
if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress); if (sncpNodeAddresses != null && localSncpAddress != null) {
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_NODES_KEY, localSncpAddress);
}
} }
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid); protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
@@ -140,6 +148,35 @@ public abstract class WebSocketNode {
}); });
} }
/**
* 判断指定用户是否WebSocket在线
*
* @param userid
*
* @return boolean
*/
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
}
return this.sncpNodeAddresses.existsAsync(userid);
}
/**
* 获取在线用户总数
*
*
* @return boolean
*/
public CompletableFuture<Integer> getUserSize() {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
return this.sncpNodeAddresses.getKeySizeAsync().thenCompose(count -> {
return sncpNodeAddresses.existsAsync(SOURCE_SNCP_NODES_KEY).thenApply(exists -> exists ? (count - 1) : count);
});
}
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
/** /**
* 获取本地的WebSocketEngine没有则返回null * 获取本地的WebSocketEngine没有则返回null

View File

@@ -68,7 +68,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override @Override
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) { public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr); CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr)); future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_NODES_KEY, sncpAddr));
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
return future; return future;
} }