From bb87774ba9aa4b9fe4d7d8d41a2c8d614653b732 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Wed, 17 May 2017 02:32:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocketEngine.java | 6 +- src/org/redkale/net/http/WebSocketNode.java | 97 +++++++------------ src/org/redkale/net/http/WebSocketRunner.java | 6 +- .../redkale/net/http/WebSocketServlet.java | 6 +- .../redkale/service/WebSocketNodeService.java | 60 ++++++------ 5 files changed, 78 insertions(+), 97 deletions(-) diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index cc57f9ffa..5a497eb89 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -79,21 +79,21 @@ public final class WebSocketEngine { if (group == null) { group = new WebSocketGroup(socket._groupid); containers.putIfAbsent(socket._groupid, group); + if (node != null) node.connect(socket._groupid); } group.add(socket); - if (node != null) node.connect(socket._groupid, engineid); } void remove(WebSocket socket) { //非线程安全, 在常规场景中无需锁 final WebSocketGroup group = containers.get(socket._groupid); if (group == null) { - if (node != null) node.disconnect(socket._groupid, engineid); + if (node != null) node.disconnect(socket._groupid); return; } group.remove(socket); if (group.isEmpty()) { containers.remove(socket._groupid); - if (node != null) node.disconnect(socket._groupid, engineid); + if (node != null) node.disconnect(socket._groupid); } } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 9ca488476..bd619ae31 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -40,13 +40,8 @@ public abstract class WebSocketNode { @Resource(name = "$") protected CacheSource sncpAddressNodes; - //protected WebSocketEngine onlyoneEngine; - - //存放本地节点上所有WebSocketEngine - protected final ConcurrentHashMap localEngines = new ConcurrentHashMap(); - - //存放本地节点上所有在线用户的队列信息,Set 为 engineid 的集合, key: groupid - protected final ConcurrentHashMap> localEngineids = new ConcurrentHashMap(); + //当前节点的本地WebSocketEngine + protected WebSocketEngine _localEngine; public void init(AnyValue conf) { @@ -57,12 +52,8 @@ public abstract class WebSocketNode { } public final void postDestroy(AnyValue conf) { - HashMap> engines = new HashMap<>(localEngineids); - engines.forEach((k, v) -> { - new HashSet<>(v).forEach(e -> { - if (localEngines.containsKey(e)) disconnect(k, e); - }); - }); + if (this._localEngine == null) return; + this._localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); } protected abstract CompletableFuture> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); @@ -74,41 +65,25 @@ public abstract class WebSocketNode { protected abstract CompletableFuture disconnect(Serializable groupid, InetSocketAddress addr); //-------------------------------------------------------------------------------- - final void connect(final Serializable groupid, final String engineid) { - if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ")."); - Set engineids = localEngineids.get(groupid); - if (engineids == null) { - engineids = new CopyOnWriteArraySet<>(); - localEngineids.putIfAbsent(groupid, engineids); - } - final Set engineids0 = engineids; - if (localSncpAddress != null && engineids.isEmpty()) { - CompletableFuture future = connect(groupid, localSncpAddress); - if (future != null) { - future.whenComplete((u, e) -> { //成功才记录 - if (e != null) engineids0.add(engineid); - }); - } - } - + final CompletableFuture connect(final Serializable groupid) { + if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + this._localEngine.getEngineid() + ")."); + return connect(groupid, localSncpAddress); } - final void disconnect(Serializable groupid, String engineid) { - if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ")."); - Set engineids = localEngineids.get(groupid); - if (engineids == null || engineids.isEmpty()) return; - engineids.remove(engineid); - if (engineids.isEmpty()) { - localEngineids.remove(groupid); - if (localSncpAddress != null) disconnect(groupid, localSncpAddress); - } - } - - final void putWebSocketEngine(WebSocketEngine engine) { - localEngines.put(engine.getEngineid(), engine); + final CompletableFuture disconnect(Serializable groupid) { + if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this._localEngine.getEngineid() + ")."); + return disconnect(groupid, localSncpAddress); } //-------------------------------------------------------------------------------- + /** + * 获取目标地址 + * + * @param targetAddress + * @param groupid + * + * @return + */ protected CompletableFuture> remoteOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { if (remoteNode == null) return CompletableFuture.completedFuture(null); try { @@ -158,28 +133,25 @@ public abstract class WebSocketNode { return rs; } + /** + * 向指定用户发送消息,先发送本地连接,再发送远程连接
+ * 如果当前WebSocketNode是远程模式,此方法只发送远程连接 + * + * @param groupid String + * @param recent 是否只发送给最近接入的WebSocket节点 + * @param message 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示异常 + */ //异步待优化 public final CompletableFuture sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) { return CompletableFuture.supplyAsync(() -> { - final Set engineids = localEngineids.get(groupid); - if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids); + if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); int rscode = RETCODE_GROUP_EMPTY; - if (engineids != null && !engineids.isEmpty()) { - for (String engineid : engineids) { - final WebSocketEngine engine = localEngines.get(engineid); - if (engine != null) { //在本地 - final WebSocketGroup group = engine.getWebSocketGroup(groupid); - if (group == null || group.isEmpty()) { - engineids.remove(engineid); - if (finest) logger.finest("websocket want send message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} but websocket group is empty "); - rscode = RETCODE_GROUP_EMPTY; - break; - } - rscode = group.send(recent, message, last); - } - } - } - if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { + WebSocketGroup group = this._localEngine == null ? null : this._localEngine.getWebSocketGroup(groupid); + if (group != null) rscode = group.send(recent, message, last); + if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { //没有其他远程的WebSocket连接 if (finest) { if ((recent && rscode == 0)) { logger.finest("websocket want send recent message success"); @@ -189,8 +161,9 @@ public abstract class WebSocketNode { } return rscode; } + if (this.sncpAddressNodes == null || this.remoteNode == null) return rscode; //没有CacheSource就不会有分布式节点 //-----------------------发送远程的----------------------------- - Collection addrs = sncpAddressNodes.getCollection(groupid); + Collection addrs = sncpAddressNodes.getCollectionAsync(groupid).join(); if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点),所以正常情况下addrs不会为空。 if (recent) { diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index c6cd1b696..143846d90 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -18,8 +18,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.*; /** + * WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner + * + *

+ * 详情见: https://redkale.org * - *

详情见: https://redkale.org * @author zhangjx */ public class WebSocketRunner implements Runnable { @@ -386,6 +389,7 @@ public class WebSocketRunner implements Runnable { * * @param buffer * @param exbuffers + * * @return */ public WebSocketPacket decode(final ByteBuffer buffer, ByteBuffer... exbuffers) { diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 89c99be1f..83694d800 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -13,7 +13,7 @@ import java.util.*; import java.util.logging.*; import javax.annotation.*; import org.redkale.convert.json.JsonConvert; -import org.redkale.service.WebSocketNodeService; +import org.redkale.service.*; import org.redkale.util.*; /** @@ -76,11 +76,11 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl InetSocketAddress addr = context.getServerAddress(); this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger); if (this.node == null) this.node = createWebSocketNode(); - if (this.node == null) { + if (this.node == null) { //没有部署SNCP,即不是分布式 this.node = new WebSocketNodeService(); if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } - this.node.putWebSocketEngine(engine); + this.node._localEngine = engine; //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service this.node.init(conf); this.engine.init(conf); } diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 12706fa37..c1d45f7c5 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -37,16 +37,11 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid); + if (this._localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); return CompletableFuture.supplyAsync(() -> { - final Set engineids = localEngineids.get(groupid); - if (engineids == null || engineids.isEmpty()) return null; final List rs = new ArrayList<>(); - for (String engineid : engineids) { - final WebSocketEngine engine = localEngines.get(engineid); - if (engine == null) continue; - final WebSocketGroup group = engine.getWebSocketGroup(groupid); - group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr())); - } + final WebSocketGroup group = this._localEngine.getWebSocketGroup(groupid); + if (group != null) group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr())); return rs; }); } @@ -54,36 +49,45 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { return CompletableFuture.supplyAsync(() -> { - final Set engineids = localEngineids.get(groupid); - if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY; - int code = RETCODE_GROUP_EMPTY; - for (String engineid : engineids) { - final WebSocketEngine engine = localEngines.get(engineid); - if (engine != null) { //在本地 - final WebSocketGroup group = engine.getWebSocketGroup(groupid); - if (group == null || group.isEmpty()) { - if (finest) logger.finest("receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); - return RETCODE_GROUP_EMPTY; - } - code = group.send(recent, message, last); - if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code); - } + if (this._localEngine == null) return RETCODE_GROUP_EMPTY; + final WebSocketGroup group = this._localEngine.getWebSocketGroup(groupid); + if (group == null || group.isEmpty()) { + if (finest) logger.finest("receive websocket message {engineid:'" + this._localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); + return RETCODE_GROUP_EMPTY; } + int code = group.send(recent, message, last); + if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code); return code; }); } + /** + * 当用户连接到节点,需要更新到CacheSource + * + * @param groupid String + * @param sncpAddr InetSocketAddress + * + * @return 无返回值 + */ @Override - public CompletableFuture connect(Serializable groupid, InetSocketAddress addr) { - CompletableFuture future = sncpAddressNodes.appendSetItemAsync(groupid, addr); - if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr); + public CompletableFuture connect(Serializable groupid, InetSocketAddress sncpAddr) { + CompletableFuture future = sncpAddressNodes.appendSetItemAsync(groupid, sncpAddr); + if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + sncpAddr); return future; } + /** + * 当用户从一个节点断掉了所有的连接,需要从CacheSource中删除 + * + * @param groupid String + * @param sncpAddr InetSocketAddress + * + * @return 无返回值 + */ @Override - public CompletableFuture disconnect(Serializable groupid, InetSocketAddress addr) { - CompletableFuture future = sncpAddressNodes.removeSetItemAsync(groupid, addr); - if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr); + public CompletableFuture disconnect(Serializable groupid, InetSocketAddress sncpAddr) { + CompletableFuture future = sncpAddressNodes.removeSetItemAsync(groupid, sncpAddr); + if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + sncpAddr); return future; } }