diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index bd619ae31..1c8ca95c9 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -37,23 +37,23 @@ public abstract class WebSocketNode { protected WebSocketNode remoteNode; //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid + //如果不是分布式(没有SNCP),则InetSocketAddress为当前WebSocketServlet的监听地址 @Resource(name = "$") protected CacheSource sncpAddressNodes; //当前节点的本地WebSocketEngine - protected WebSocketEngine _localEngine; + protected WebSocketEngine localEngine; public void init(AnyValue conf) { - } public void destroy(AnyValue conf) { - } public final void postDestroy(AnyValue conf) { - if (this._localEngine == null) return; - this._localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); + if (this.localEngine == null) return; + //关掉所有本地本地WebSocket + this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); } protected abstract CompletableFuture> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); @@ -66,12 +66,12 @@ public abstract class WebSocketNode { //-------------------------------------------------------------------------------- final CompletableFuture connect(final Serializable groupid) { - if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + this._localEngine.getEngineid() + ")."); + if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + this.localEngine.getEngineid() + ")."); return connect(groupid, localSncpAddress); } final CompletableFuture disconnect(Serializable groupid) { - if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this._localEngine.getEngineid() + ")."); + if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this.localEngine.getEngineid() + ")."); return disconnect(groupid, localSncpAddress); } @@ -149,7 +149,7 @@ public abstract class WebSocketNode { return CompletableFuture.supplyAsync(() -> { if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); int rscode = RETCODE_GROUP_EMPTY; - WebSocketGroup group = this._localEngine == null ? null : this._localEngine.getWebSocketGroup(groupid); + WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid); if (group != null) rscode = group.send(recent, message, last); if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { //没有其他远程的WebSocket连接 if (finest) { diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 83694d800..a7dcaa734 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -69,27 +69,25 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Resource(name = "$") protected WebSocketNode node; - protected WebSocketEngine engine; - @Override final void preInit(HttpContext context, AnyValue conf) { InetSocketAddress addr = context.getServerAddress(); - this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger); if (this.node == null) this.node = createWebSocketNode(); if (this.node == null) { //没有部署SNCP,即不是分布式 this.node = new WebSocketNodeService(); if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } - this.node._localEngine = engine; //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service + //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service + this.node.localEngine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger); this.node.init(conf); - this.engine.init(conf); + this.node.localEngine.init(conf); } @Override final void postDestroy(HttpContext context, AnyValue conf) { this.node.postDestroy(conf); super.destroy(context, conf); - this.engine.destroy(conf); + this.node.localEngine.destroy(conf); } @Override @@ -112,7 +110,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return; } final WebSocket webSocket = this.createWebSocket(); - webSocket._engine = engine; + webSocket._engine = this.node.localEngine; webSocket._jsonConvert = jsonConvert; webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddr = request.getRemoteAddr(); @@ -145,7 +143,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return; } webSocket._groupid = groupid; - engine.add(webSocket); + WebSocketServlet.this.node.localEngine.add(webSocket); context.runAsync(new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary)); response.finish(true); } diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index c1d45f7c5..c98cf6c06 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -37,10 +37,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid); - if (this._localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); + if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); return CompletableFuture.supplyAsync(() -> { final List rs = new ArrayList<>(); - final WebSocketGroup group = this._localEngine.getWebSocketGroup(groupid); + final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); if (group != null) group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr())); return rs; }); @@ -49,10 +49,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { return CompletableFuture.supplyAsync(() -> { - if (this._localEngine == null) return RETCODE_GROUP_EMPTY; - final WebSocketGroup group = this._localEngine.getWebSocketGroup(groupid); + if (this.localEngine == null) return RETCODE_GROUP_EMPTY; + final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); if (group == null || group.isEmpty()) { - if (finest) logger.finest("receive websocket message {engineid:'" + this._localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); + if (finest) logger.finest("receive websocket message {engineid:'" + this.localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); return RETCODE_GROUP_EMPTY; } int code = group.send(recent, message, last);