From 27468d9f0ce5c8637cf449fcc11c34b04688a8f6 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 13 May 2017 18:44:32 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocketNode.java | 45 +++++++++++++-------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 00e40ea02..9ca488476 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -36,14 +36,17 @@ public abstract class WebSocketNode { @RpcRemote protected WebSocketNode remoteNode; - //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合 + //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid @Resource(name = "$") protected CacheSource sncpAddressNodes; - //存放本地节点上所有在线用户的队列信息,Set 为 engineid 的集合 - protected final ConcurrentHashMap> localEngines = new ConcurrentHashMap(); + //protected WebSocketEngine onlyoneEngine; + + //存放本地节点上所有WebSocketEngine + protected final ConcurrentHashMap localEngines = new ConcurrentHashMap(); - protected final ConcurrentHashMap engines = new ConcurrentHashMap(); + //存放本地节点上所有在线用户的队列信息,Set 为 engineid 的集合, key: groupid + protected final ConcurrentHashMap> localEngineids = new ConcurrentHashMap(); public void init(AnyValue conf) { @@ -54,10 +57,10 @@ public abstract class WebSocketNode { } public final void postDestroy(AnyValue conf) { - HashMap> nodes = new HashMap<>(localEngines); - nodes.forEach((k, v) -> { + HashMap> engines = new HashMap<>(localEngineids); + engines.forEach((k, v) -> { new HashSet<>(v).forEach(e -> { - if (engines.containsKey(e)) disconnect(k, e); + if (localEngines.containsKey(e)) disconnect(k, e); }); }); } @@ -71,30 +74,38 @@ public abstract class WebSocketNode { protected abstract CompletableFuture disconnect(Serializable groupid, InetSocketAddress addr); //-------------------------------------------------------------------------------- - final void connect(Serializable groupid, String engineid) { + final void connect(final Serializable groupid, final String engineid) { if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ")."); - Set engineids = localEngines.get(groupid); + Set engineids = localEngineids.get(groupid); if (engineids == null) { engineids = new CopyOnWriteArraySet<>(); - localEngines.putIfAbsent(groupid, engineids); + localEngineids.putIfAbsent(groupid, engineids); } - if (localSncpAddress != null && engineids.isEmpty()) connect(groupid, localSncpAddress); - engineids.add(engineid); + final Set engineids0 = engineids; + if (localSncpAddress != null && engineids.isEmpty()) { + CompletableFuture future = connect(groupid, localSncpAddress); + if (future != null) { + future.whenComplete((u, e) -> { //成功才记录 + if (e != null) engineids0.add(engineid); + }); + } + } + } final void disconnect(Serializable groupid, String engineid) { if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ")."); - Set engineids = localEngines.get(groupid); + Set engineids = localEngineids.get(groupid); if (engineids == null || engineids.isEmpty()) return; engineids.remove(engineid); if (engineids.isEmpty()) { - localEngines.remove(groupid); + localEngineids.remove(groupid); if (localSncpAddress != null) disconnect(groupid, localSncpAddress); } } final void putWebSocketEngine(WebSocketEngine engine) { - engines.put(engine.getEngineid(), engine); + localEngines.put(engine.getEngineid(), engine); } //-------------------------------------------------------------------------------- @@ -150,12 +161,12 @@ public abstract class WebSocketNode { //异步待优化 public final CompletableFuture sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) { return CompletableFuture.supplyAsync(() -> { - final Set engineids = localEngines.get(groupid); + final Set engineids = localEngineids.get(groupid); if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids); int rscode = RETCODE_GROUP_EMPTY; if (engineids != null && !engineids.isEmpty()) { for (String engineid : engineids) { - final WebSocketEngine engine = engines.get(engineid); + final WebSocketEngine engine = localEngines.get(engineid); if (engine != null) { //在本地 final WebSocketGroup group = engine.getWebSocketGroup(groupid); if (group == null || group.isEmpty()) {