This commit is contained in:
@@ -38,10 +38,10 @@ public abstract class WebSocketNode {
|
|||||||
|
|
||||||
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合
|
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合
|
||||||
@Resource(name = "$")
|
@Resource(name = "$")
|
||||||
protected CacheSource<Serializable, InetSocketAddress> sncpNodes;
|
protected CacheSource<Serializable, InetSocketAddress> sncpAddressNodes;
|
||||||
|
|
||||||
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合
|
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合
|
||||||
protected final ConcurrentHashMap<Serializable, Set<String>> localNodes = new ConcurrentHashMap();
|
protected final ConcurrentHashMap<Serializable, Set<String>> localEngines = new ConcurrentHashMap();
|
||||||
|
|
||||||
protected final ConcurrentHashMap<String, WebSocketEngine> engines = new ConcurrentHashMap();
|
protected final ConcurrentHashMap<String, WebSocketEngine> engines = new ConcurrentHashMap();
|
||||||
|
|
||||||
@@ -54,7 +54,7 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public final void postDestroy(AnyValue conf) {
|
public final void postDestroy(AnyValue conf) {
|
||||||
HashMap<Serializable, Set<String>> nodes = new HashMap<>(localNodes);
|
HashMap<Serializable, Set<String>> nodes = new HashMap<>(localEngines);
|
||||||
nodes.forEach((k, v) -> {
|
nodes.forEach((k, v) -> {
|
||||||
new HashSet<>(v).forEach(e -> {
|
new HashSet<>(v).forEach(e -> {
|
||||||
if (engines.containsKey(e)) disconnect(k, e);
|
if (engines.containsKey(e)) disconnect(k, e);
|
||||||
@@ -89,7 +89,7 @@ public abstract class WebSocketNode {
|
|||||||
* @return 地址列表
|
* @return 地址列表
|
||||||
*/
|
*/
|
||||||
public Collection<InetSocketAddress> getOnlineNodes(final Serializable groupid) {
|
public Collection<InetSocketAddress> getOnlineNodes(final Serializable groupid) {
|
||||||
return sncpNodes == null ? null : sncpNodes.getCollection(groupid);
|
return sncpAddressNodes == null ? null : sncpAddressNodes.getCollection(groupid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -113,10 +113,10 @@ public abstract class WebSocketNode {
|
|||||||
|
|
||||||
final void connect(Serializable groupid, String engineid) {
|
final void connect(Serializable groupid, String engineid) {
|
||||||
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ").");
|
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ").");
|
||||||
Set<String> engineids = localNodes.get(groupid);
|
Set<String> engineids = localEngines.get(groupid);
|
||||||
if (engineids == null) {
|
if (engineids == null) {
|
||||||
engineids = new CopyOnWriteArraySet<>();
|
engineids = new CopyOnWriteArraySet<>();
|
||||||
localNodes.putIfAbsent(groupid, engineids);
|
localEngines.putIfAbsent(groupid, engineids);
|
||||||
}
|
}
|
||||||
if (localSncpAddress != null && engineids.isEmpty()) connect(groupid, localSncpAddress);
|
if (localSncpAddress != null && engineids.isEmpty()) connect(groupid, localSncpAddress);
|
||||||
engineids.add(engineid);
|
engineids.add(engineid);
|
||||||
@@ -124,11 +124,11 @@ public abstract class WebSocketNode {
|
|||||||
|
|
||||||
final void disconnect(Serializable groupid, String engineid) {
|
final void disconnect(Serializable groupid, String engineid) {
|
||||||
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ").");
|
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ").");
|
||||||
Set<String> engineids = localNodes.get(groupid);
|
Set<String> engineids = localEngines.get(groupid);
|
||||||
if (engineids == null || engineids.isEmpty()) return;
|
if (engineids == null || engineids.isEmpty()) return;
|
||||||
engineids.remove(engineid);
|
engineids.remove(engineid);
|
||||||
if (engineids.isEmpty()) {
|
if (engineids.isEmpty()) {
|
||||||
localNodes.remove(groupid);
|
localEngines.remove(groupid);
|
||||||
if (localSncpAddress != null) disconnect(groupid, localSncpAddress);
|
if (localSncpAddress != null) disconnect(groupid, localSncpAddress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -138,7 +138,7 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public final int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) {
|
public final int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) {
|
||||||
final Set<String> engineids = localNodes.get(groupid);
|
final Set<String> engineids = localEngines.get(groupid);
|
||||||
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids);
|
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids);
|
||||||
int rscode = RETCODE_GROUP_EMPTY;
|
int rscode = RETCODE_GROUP_EMPTY;
|
||||||
if (engineids != null && !engineids.isEmpty()) {
|
if (engineids != null && !engineids.isEmpty()) {
|
||||||
@@ -156,7 +156,7 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ((recent && rscode == 0) || remoteNode == null || sncpNodes == null) {
|
if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) {
|
||||||
if (finest) {
|
if (finest) {
|
||||||
if ((recent && rscode == 0)) {
|
if ((recent && rscode == 0)) {
|
||||||
logger.finest("websocket want send recent message success");
|
logger.finest("websocket want send recent message success");
|
||||||
@@ -167,7 +167,7 @@ public abstract class WebSocketNode {
|
|||||||
return rscode;
|
return rscode;
|
||||||
}
|
}
|
||||||
//-----------------------发送远程的-----------------------------
|
//-----------------------发送远程的-----------------------------
|
||||||
Collection<InetSocketAddress> addrs = sncpNodes.getCollection(groupid);
|
Collection<InetSocketAddress> addrs = sncpAddressNodes.getCollection(groupid);
|
||||||
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs);
|
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs);
|
||||||
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点),所以正常情况下addrs不会为空。
|
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点),所以正常情况下addrs不会为空。
|
||||||
if (recent) {
|
if (recent) {
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
@Override
|
@Override
|
||||||
public List<String> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) {
|
public List<String> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) {
|
||||||
if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid);
|
if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid);
|
||||||
final Set<String> engineids = localNodes.get(groupid);
|
final Set<String> engineids = localEngines.get(groupid);
|
||||||
if (engineids == null || engineids.isEmpty()) return null;
|
if (engineids == null || engineids.isEmpty()) return null;
|
||||||
final List<String> rs = new ArrayList<>();
|
final List<String> rs = new ArrayList<>();
|
||||||
for (String engineid : engineids) {
|
for (String engineid : engineids) {
|
||||||
@@ -50,7 +50,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
|
public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
|
||||||
final Set<String> engineids = localNodes.get(groupid);
|
final Set<String> engineids = localEngines.get(groupid);
|
||||||
if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY;
|
if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY;
|
||||||
int code = RETCODE_GROUP_EMPTY;
|
int code = RETCODE_GROUP_EMPTY;
|
||||||
for (String engineid : engineids) {
|
for (String engineid : engineids) {
|
||||||
@@ -70,13 +70,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connect(Serializable groupid, InetSocketAddress addr) {
|
public void connect(Serializable groupid, InetSocketAddress addr) {
|
||||||
sncpNodes.appendSetItem(groupid, addr);
|
sncpAddressNodes.appendSetItem(groupid, addr);
|
||||||
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr);
|
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect(Serializable groupid, InetSocketAddress addr) {
|
public void disconnect(Serializable groupid, InetSocketAddress addr) {
|
||||||
sncpNodes.removeSetItem(groupid, addr);
|
sncpAddressNodes.removeSetItem(groupid, addr);
|
||||||
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr);
|
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user