This commit is contained in:
Redkale
2017-05-13 18:44:32 +08:00
parent bb8462af2a
commit 27468d9f0c

View File

@@ -36,14 +36,17 @@ public abstract class WebSocketNode {
@RpcRemote @RpcRemote
protected WebSocketNode remoteNode; protected WebSocketNode remoteNode;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合 //存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合 key: groupid
@Resource(name = "$") @Resource(name = "$")
protected CacheSource<Serializable, InetSocketAddress> sncpAddressNodes; protected CacheSource<Serializable, InetSocketAddress> sncpAddressNodes;
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合 //protected WebSocketEngine onlyoneEngine;
protected final ConcurrentHashMap<Serializable, Set<String>> localEngines = new ConcurrentHashMap();
protected final ConcurrentHashMap<String, WebSocketEngine> engines = new ConcurrentHashMap(); //存放本地节点上所有WebSocketEngine
protected final ConcurrentHashMap<String, WebSocketEngine> localEngines = new ConcurrentHashMap();
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合, key: groupid
protected final ConcurrentHashMap<Serializable, Set<String>> localEngineids = new ConcurrentHashMap();
public void init(AnyValue conf) { public void init(AnyValue conf) {
@@ -54,10 +57,10 @@ public abstract class WebSocketNode {
} }
public final void postDestroy(AnyValue conf) { public final void postDestroy(AnyValue conf) {
HashMap<Serializable, Set<String>> nodes = new HashMap<>(localEngines); HashMap<Serializable, Set<String>> engines = new HashMap<>(localEngineids);
nodes.forEach((k, v) -> { engines.forEach((k, v) -> {
new HashSet<>(v).forEach(e -> { new HashSet<>(v).forEach(e -> {
if (engines.containsKey(e)) disconnect(k, e); if (localEngines.containsKey(e)) disconnect(k, e);
}); });
}); });
} }
@@ -71,30 +74,38 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress addr); protected abstract CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress addr);
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
final void connect(Serializable groupid, String engineid) { final void connect(final Serializable groupid, final String engineid) {
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ")."); if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ").");
Set<String> engineids = localEngines.get(groupid); Set<String> engineids = localEngineids.get(groupid);
if (engineids == null) { if (engineids == null) {
engineids = new CopyOnWriteArraySet<>(); engineids = new CopyOnWriteArraySet<>();
localEngines.putIfAbsent(groupid, engineids); localEngineids.putIfAbsent(groupid, engineids);
} }
if (localSncpAddress != null && engineids.isEmpty()) connect(groupid, localSncpAddress); final Set<String> engineids0 = engineids;
engineids.add(engineid); if (localSncpAddress != null && engineids.isEmpty()) {
CompletableFuture<Void> future = connect(groupid, localSncpAddress);
if (future != null) {
future.whenComplete((u, e) -> { //成功才记录
if (e != null) engineids0.add(engineid);
});
}
}
} }
final void disconnect(Serializable groupid, String engineid) { final void disconnect(Serializable groupid, String engineid) {
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ")."); if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ").");
Set<String> engineids = localEngines.get(groupid); Set<String> engineids = localEngineids.get(groupid);
if (engineids == null || engineids.isEmpty()) return; if (engineids == null || engineids.isEmpty()) return;
engineids.remove(engineid); engineids.remove(engineid);
if (engineids.isEmpty()) { if (engineids.isEmpty()) {
localEngines.remove(groupid); localEngineids.remove(groupid);
if (localSncpAddress != null) disconnect(groupid, localSncpAddress); if (localSncpAddress != null) disconnect(groupid, localSncpAddress);
} }
} }
final void putWebSocketEngine(WebSocketEngine engine) { final void putWebSocketEngine(WebSocketEngine engine) {
engines.put(engine.getEngineid(), engine); localEngines.put(engine.getEngineid(), engine);
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@@ -150,12 +161,12 @@ public abstract class WebSocketNode {
//异步待优化 //异步待优化
public final CompletableFuture<Integer> sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) { public final CompletableFuture<Integer> sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) {
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
final Set<String> engineids = localEngines.get(groupid); final Set<String> engineids = localEngineids.get(groupid);
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids); if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids);
int rscode = RETCODE_GROUP_EMPTY; int rscode = RETCODE_GROUP_EMPTY;
if (engineids != null && !engineids.isEmpty()) { if (engineids != null && !engineids.isEmpty()) {
for (String engineid : engineids) { for (String engineid : engineids) {
final WebSocketEngine engine = engines.get(engineid); final WebSocketEngine engine = localEngines.get(engineid);
if (engine != null) { //在本地 if (engine != null) { //在本地
final WebSocketGroup group = engine.getWebSocketGroup(groupid); final WebSocketGroup group = engine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) { if (group == null || group.isEmpty()) {