From 12fa033e1577b629c8d7db5a5ff19baafe7e3edb Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Wed, 5 Jul 2017 15:57:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=A4=E6=96=AD=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E6=98=AF=E5=90=A6=E5=9C=A8=E7=BA=BF=E5=92=8C=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E5=9C=A8=E7=BA=BF=E7=94=A8=E6=88=B7=E6=80=BB=E6=95=B0?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocketEngine.java | 49 +++++++++++++------ src/org/redkale/net/http/WebSocketNode.java | 39 ++++++++++++++- .../redkale/service/WebSocketNodeService.java | 2 +- 3 files changed, 73 insertions(+), 17 deletions(-) diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index a253987da..b4a88d219 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -24,18 +24,18 @@ import org.redkale.util.*; * * @author zhangjx */ -public final class WebSocketEngine { +public class WebSocketEngine { - //全局自增长ID + @Comment("全局自增长ID, 为了确保在一个进程里多个WebSocketEngine定时发送ping时不会同时进行") private static final AtomicInteger sequence = new AtomicInteger(); - //Engine自增长序号ID + @Comment("Engine自增长序号ID") private final int index; - //当前WebSocket对应的Engine + @Comment("当前WebSocket对应的Engine") private final String engineid; - //当前WebSocket对应的Node + @Comment("当前WebSocket对应的Node") protected final WebSocketNode node; //HttpContext @@ -44,23 +44,25 @@ public final class WebSocketEngine { //Convert protected final Convert sendConvert; - protected final boolean single; //是否单用户单连接 + @Comment("是否单用户单连接") + protected final boolean single; - //在线用户ID对应的WebSocket组,用于单用户单连接模式 + @Comment("在线用户ID对应的WebSocket组,用于单用户单连接模式") private final Map websockets = new ConcurrentHashMap<>(); - //在线用户ID对应的WebSocket组,用于单用户多连接模式 + @Comment("在线用户ID对应的WebSocket组,用于单用户多连接模式") private final Map> websockets2 = new ConcurrentHashMap<>(); - //用于PING的定时器 + @Comment("用于PING的定时器") private ScheduledThreadPoolExecutor scheduler; - //日志 + @Comment("日志") protected final Logger logger; - //FINEST日志级别 + @Comment("日志级别") protected final boolean finest; + @Comment("PING的间隔秒数") private int liveinterval; protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) { @@ -71,8 +73,8 @@ public final class WebSocketEngine { this.node = node; this.liveinterval = liveinterval; this.logger = logger; - this.index = sequence.getAndIncrement(); this.finest = logger.isLoggable(Level.FINEST); + this.index = sequence.getAndIncrement(); } void init(AnyValue conf) { @@ -97,6 +99,7 @@ public final class WebSocketEngine { if (scheduler != null) scheduler.shutdownNow(); } + @Comment("添加WebSocket") void add(WebSocket socket) { if (single) { websockets.put(socket._userid, socket); @@ -111,6 +114,7 @@ public final class WebSocketEngine { if (node != null) node.connect(socket._userid); } + @Comment("从WebSocketEngine删除指定WebSocket") void remove(WebSocket socket) { Serializable userid = socket._userid; if (single) { @@ -128,10 +132,12 @@ public final class WebSocketEngine { } } + @Comment("给所有连接用户发送消息") public CompletableFuture broadcastMessage(final Object message, final boolean last) { return broadcastMessage(null, message, last); } + @Comment("给指定WebSocket连接用户发送消息") public CompletableFuture broadcastMessage(final Predicate predicate, final Object message, final boolean last) { if (message instanceof CompletableFuture) { return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(predicate, json, last)); @@ -177,6 +183,7 @@ public final class WebSocketEngine { } } + @Comment("给指定用户组发送消息") public CompletableFuture sendMessage(final Object message, final boolean last, final Serializable... userids) { if (message instanceof CompletableFuture) { return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids)); @@ -226,21 +233,33 @@ public final class WebSocketEngine { } } - Collection getLocalWebSockets() { + @Comment("获取所有连接") + public Collection getLocalWebSockets() { if (single) return websockets.values(); List list = new ArrayList<>(); websockets2.values().forEach(x -> list.addAll(x)); return list; } - //适用于单用户单连接模式 + @Comment("获取当前连接总数") + public int getLocalWebSocketSize() { + if (single) return websockets.size(); + return (int) websockets2.values().stream().mapToInt(sublist -> sublist.size()).count(); + } + + @Comment("获取当前用户总数") + public int getLocalUserSize() { + return single ? websockets.size() : websockets2.size(); + } + + @Comment("适用于单用户单连接模式") public WebSocket findLocalWebSocket(Serializable userid) { if (single) return websockets.get(userid); List list = websockets2.get(userid); return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1); } - //适用于单用户多连接模式 + @Comment("适用于单用户多连接模式") public Stream getLocalWebSockets(Serializable userid) { if (single) { WebSocket websocket = websockets.get(userid); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index ac7f91db7..30eba5eb5 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -26,6 +26,12 @@ import org.redkale.util.*; */ public abstract class WebSocketNode { + @Comment("存储当前SNCP节点列表的key") + public static final String SOURCE_SNCP_NODES_KEY = "redkale_sncpnodes"; + + @Comment("存储当前用户数量的key") + public static final String SOURCE_USER_COUNT_KEY = "redkale_usercount"; + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final boolean finest = logger.isLoggable(Level.FINEST); @@ -57,7 +63,9 @@ public abstract class WebSocketNode { if (this.localEngine == null) return; //关掉所有本地本地WebSocket this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid())); - if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress); + if (sncpNodeAddresses != null && localSncpAddress != null) { + sncpNodeAddresses.removeSetItem(SOURCE_SNCP_NODES_KEY, localSncpAddress); + } } protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid); @@ -140,6 +148,35 @@ public abstract class WebSocketNode { }); } + /** + * 判断指定用户是否WebSocket在线 + * + * @param userid + * + * @return boolean + */ + public CompletableFuture existsWebSocket(final Serializable userid) { + if (this.localEngine != null && this.sncpNodeAddresses == null) { + return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid)); + } + return this.sncpNodeAddresses.existsAsync(userid); + } + + /** + * 获取在线用户总数 + * + * + * @return boolean + */ + public CompletableFuture getUserSize() { + if (this.localEngine != null && this.sncpNodeAddresses == null) { + return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); + } + return this.sncpNodeAddresses.getKeySizeAsync().thenCompose(count -> { + return sncpNodeAddresses.existsAsync(SOURCE_SNCP_NODES_KEY).thenApply(exists -> exists ? (count - 1) : count); + }); + } + //-------------------------------------------------------------------------------- /** * 获取本地的WebSocketEngine,没有则返回null diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 902b2c2c9..74a742c6e 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -68,7 +68,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr) { CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr); - future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr)); + future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_NODES_KEY, sncpAddr)); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); return future; }