From ba928b389b92fcfaa6452f6436ea27a5ec1bdaca Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Thu, 18 May 2017 14:22:44 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocket.java | 30 ++++--------------- src/org/redkale/net/http/WebSocketNode.java | 15 +++++++--- src/org/redkale/net/http/WebSocketRunner.java | 1 - .../redkale/net/http/WebSocketServlet.java | 4 ++- 4 files changed, 19 insertions(+), 31 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index c5ffc5875..620bf7432 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -81,7 +81,9 @@ public abstract class WebSocket { private final long createtime = System.currentTimeMillis(); - private Map attributes = new HashMap<>(); //非线程安全 + private Map attributes = new HashMap<>(); //非线程安全 + + protected final long websocketid = System.nanoTime(); //唯一ID protected WebSocket() { } @@ -363,28 +365,6 @@ public abstract class WebSocket { return rs; } - /** - * 获取指定groupid在线用户的节点地址列表 - * - * @param groupid groupid - * - * @return 地址列表 - */ - protected final CompletableFuture> getOnlineNodes(Serializable groupid) { - return _engine.node.getOnlineNodes(groupid); - } - - /** - * 获取指定groupid在线用户的详细连接信息 - * - * @param groupid groupid - * - * @return 地址集合 - */ - protected final CompletableFuture>> getOnlineRemoteAddress(Serializable groupid) { - return _engine.node.getOnlineRemoteAddress(groupid); - } - /** * 获取当前WebSocket下的属性,非线程安全 * @@ -407,7 +387,7 @@ public abstract class WebSocket { * @return 属性值 */ public final T removeAttribute(String name) { - return attributes == null ? null : (T) attributes.remove(name); + return attributes == null ? null : (T) attributes.remove(name); } /** @@ -417,7 +397,7 @@ public abstract class WebSocket { * @param value 属性值 */ public final void setAttribute(String name, Object value) { - if(attributes == null) attributes = new HashMap<>(); + if (attributes == null) attributes = new HashMap<>(); attributes.put(name, value); } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 93ed19dff..45d4161c0 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -38,6 +38,7 @@ public abstract class WebSocketNode { protected WebSocketNode remoteNode; //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid + //包含 localSncpAddress //如果不是分布式(没有SNCP),sncpAddressNodes 将不会被用到 @Resource(name = "$") protected CacheSource sncpAddressNodes; @@ -96,18 +97,24 @@ public abstract class WebSocketNode { } /** - * 获取在线用户的节点地址列表 + * 获取用户在线的SNCP节点地址列表,不是分布式则返回空列表
+ * InetSocketAddress 为 SNCP节点地址 * * @param groupid groupid * * @return 地址列表 */ public CompletableFuture> getOnlineNodes(final Serializable groupid) { - return sncpAddressNodes == null ? CompletableFuture.completedFuture(null) : sncpAddressNodes.getCollectionAsync(groupid); + if (this.sncpAddressNodes != null) return this.sncpAddressNodes.getCollectionAsync(groupid); + List rs = new ArrayList<>(); + if (this.localSncpAddress != null) rs.add(this.localSncpAddress); + return CompletableFuture.completedFuture(rs); } /** - * 获取在线用户的详细连接信息 + * 获取在线用户的详细连接信息
+ * Map.key 为 SNCP节点地址 + * Map.value 为 用户客户端的IP * * @param groupid groupid * @@ -116,7 +123,7 @@ public abstract class WebSocketNode { //异步待优化 public CompletableFuture>> getOnlineRemoteAddress(final Serializable groupid) { final CompletableFuture>> rs = new CompletableFuture<>(); - CompletableFuture< Collection> nodesFuture = getOnlineNodes(groupid); + CompletableFuture> nodesFuture = getOnlineNodes(groupid); if (nodesFuture == null) return CompletableFuture.completedFuture(null); nodesFuture.whenComplete((nodes, e) -> { if (e != null) { diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 143846d90..e32cdf32e 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -55,7 +55,6 @@ public class WebSocketRunner implements Runnable { this.webSocket = webSocket; this.channel = channel; this.wsbinary = wsbinary; - webSocket._runner = this; this.coder.logger = context.getLogger(); this.coder.debugable = false;//context.getLogger().isLoggable(Level.FINEST); this.readBuffer = context.pollBuffer(); diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index a7dcaa734..736bd6871 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -144,7 +144,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } webSocket._groupid = groupid; WebSocketServlet.this.node.localEngine.add(webSocket); - context.runAsync(new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary)); + WebSocketRunner runner = new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary); + webSocket._runner = runner; + context.runAsync(runner); response.finish(true); }