diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 9889004ac..a168501c6 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -38,10 +38,10 @@ public abstract class WebSocketNode { //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合 @Resource(name = "$") - protected CacheSource sncpNodes; + protected CacheSource sncpAddressNodes; //存放本地节点上所有在线用户的队列信息,Set 为 engineid 的集合 - protected final ConcurrentHashMap> localNodes = new ConcurrentHashMap(); + protected final ConcurrentHashMap> localEngines = new ConcurrentHashMap(); protected final ConcurrentHashMap engines = new ConcurrentHashMap(); @@ -54,7 +54,7 @@ public abstract class WebSocketNode { } public final void postDestroy(AnyValue conf) { - HashMap> nodes = new HashMap<>(localNodes); + HashMap> nodes = new HashMap<>(localEngines); nodes.forEach((k, v) -> { new HashSet<>(v).forEach(e -> { if (engines.containsKey(e)) disconnect(k, e); @@ -89,7 +89,7 @@ public abstract class WebSocketNode { * @return 地址列表 */ public Collection getOnlineNodes(final Serializable groupid) { - return sncpNodes == null ? null : sncpNodes.getCollection(groupid); + return sncpAddressNodes == null ? null : sncpAddressNodes.getCollection(groupid); } /** @@ -113,10 +113,10 @@ public abstract class WebSocketNode { final void connect(Serializable groupid, String engineid) { if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ")."); - Set engineids = localNodes.get(groupid); + Set engineids = localEngines.get(groupid); if (engineids == null) { engineids = new CopyOnWriteArraySet<>(); - localNodes.putIfAbsent(groupid, engineids); + localEngines.putIfAbsent(groupid, engineids); } if (localSncpAddress != null && engineids.isEmpty()) connect(groupid, localSncpAddress); engineids.add(engineid); @@ -124,11 +124,11 @@ public abstract class WebSocketNode { final void disconnect(Serializable groupid, String engineid) { if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ")."); - Set engineids = localNodes.get(groupid); + Set engineids = localEngines.get(groupid); if (engineids == null || engineids.isEmpty()) return; engineids.remove(engineid); if (engineids.isEmpty()) { - localNodes.remove(groupid); + localEngines.remove(groupid); if (localSncpAddress != null) disconnect(groupid, localSncpAddress); } } @@ -138,7 +138,7 @@ public abstract class WebSocketNode { } public final int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { - final Set engineids = localNodes.get(groupid); + final Set engineids = localEngines.get(groupid); if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids); int rscode = RETCODE_GROUP_EMPTY; if (engineids != null && !engineids.isEmpty()) { @@ -156,7 +156,7 @@ public abstract class WebSocketNode { } } } - if ((recent && rscode == 0) || remoteNode == null || sncpNodes == null) { + if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { if (finest) { if ((recent && rscode == 0)) { logger.finest("websocket want send recent message success"); @@ -167,7 +167,7 @@ public abstract class WebSocketNode { return rscode; } //-----------------------发送远程的----------------------------- - Collection addrs = sncpNodes.getCollection(groupid); + Collection addrs = sncpAddressNodes.getCollection(groupid); if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点),所以正常情况下addrs不会为空。 if (recent) { diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 412057ba3..02d1ed424 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -36,7 +36,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public List getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid); - final Set engineids = localNodes.get(groupid); + final Set engineids = localEngines.get(groupid); if (engineids == null || engineids.isEmpty()) return null; final List rs = new ArrayList<>(); for (String engineid : engineids) { @@ -50,7 +50,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { - final Set engineids = localNodes.get(groupid); + final Set engineids = localEngines.get(groupid); if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY; int code = RETCODE_GROUP_EMPTY; for (String engineid : engineids) { @@ -70,13 +70,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public void connect(Serializable groupid, InetSocketAddress addr) { - sncpNodes.appendSetItem(groupid, addr); + sncpAddressNodes.appendSetItem(groupid, addr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr); } @Override public void disconnect(Serializable groupid, InetSocketAddress addr) { - sncpNodes.removeSetItem(groupid, addr); + sncpAddressNodes.removeSetItem(groupid, addr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr); } }