From df8e8395804541d3091631b4e2b3a5902fbdbc2a Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 3 Jan 2019 09:35:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96WebSocket=E7=BB=99=E5=A4=9A?= =?UTF-8?q?=E4=B8=AAuserid=E5=8F=91=E6=B6=88=E6=81=AF=E7=9A=84=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/Context.java | 8 + src/org/redkale/net/http/WebSocket.java | 2 +- src/org/redkale/net/http/WebSocketEngine.java | 34 +-- src/org/redkale/net/http/WebSocketNode.java | 220 ++++++++++++------ src/org/redkale/net/http/WebSocketRunner.java | 2 +- .../redkale/net/http/WebSocketServlet.java | 6 +- .../redkale/service/WebSocketNodeService.java | 12 +- 7 files changed, 184 insertions(+), 100 deletions(-) diff --git a/src/org/redkale/net/Context.java b/src/org/redkale/net/Context.java index 3cfcdcc13..18bf7abe9 100644 --- a/src/org/redkale/net/Context.java +++ b/src/org/redkale/net/Context.java @@ -148,6 +148,14 @@ public class Context { executor.execute(r); } + public int getCorePoolSize() { + return executor.getCorePoolSize(); + } + + public ThreadFactory getThreadFactory() { + return executor.getThreadFactory(); + } + public int getBufferCapacity() { return bufferCapacity; } diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 90ead256f..0029b2308 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -540,7 +540,7 @@ public abstract class WebSocket { */ public CompletableFuture changeUserid(final G newuserid) { if (newuserid == null) throw new NullPointerException("newuserid is null"); - return _engine.changeUserid(this, newuserid); + return _engine.changeLocalUserid(this, newuserid); } /** diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index d9bdd1926..7b53e4e2d 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -127,7 +127,7 @@ public class WebSocketEngine { } @Comment("添加WebSocket") - void add(WebSocket socket) { + void addLocal(WebSocket socket) { if (single) { currconns.incrementAndGet(); websockets.put(socket._userid, socket); @@ -144,7 +144,7 @@ public class WebSocketEngine { } @Comment("从WebSocketEngine删除指定WebSocket") - CompletableFuture removeThenClose(WebSocket socket) { + CompletableFuture removeLocalThenClose(WebSocket socket) { Serializable userid = socket._userid; if (single) { currconns.decrementAndGet(); @@ -165,7 +165,7 @@ public class WebSocketEngine { } @Comment("更改WebSocket的userid") - CompletableFuture changeUserid(WebSocket socket, final Serializable newuserid) { + CompletableFuture changeLocalUserid(WebSocket socket, final Serializable newuserid) { if (newuserid == null) throw new NullPointerException("newuserid is null"); final Serializable olduserid = socket._userid; socket._userid = newuserid; @@ -207,20 +207,20 @@ public class WebSocketEngine { } @Comment("给所有连接用户发送消息") - public CompletableFuture broadcastMessage(final Object message, final boolean last) { - return broadcastMessage((Predicate) null, message, last); + public CompletableFuture broadcastLocalMessage(final Object message, final boolean last) { + return WebSocketEngine.this.broadcastLocalMessage((Predicate) null, message, last); } @Comment("给指定WebSocket连接用户发送消息") - public CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Object message, final boolean last) { + public CompletableFuture broadcastLocalMessage(final WebSocketRange wsrange, final Object message, final boolean last) { Predicate predicate = wsrange == null ? null : (ws) -> ws.predicate(wsrange); - return broadcastMessage(predicate, message, last); + return WebSocketEngine.this.broadcastLocalMessage(predicate, message, last); } @Comment("给指定WebSocket连接用户发送消息") - public CompletableFuture broadcastMessage(final Predicate predicate, final Object message, final boolean last) { + public CompletableFuture broadcastLocalMessage(final Predicate predicate, final Object message, final boolean last) { if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(predicate, json, last)); + return ((CompletableFuture) message).thenCompose((json) -> WebSocketEngine.this.broadcastLocalMessage(predicate, json, last)); } final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null); if (more) { @@ -265,19 +265,19 @@ public class WebSocketEngine { } @Comment("给指定用户组发送消息") - public CompletableFuture sendMessage(final Object message, final boolean last, final Stream userids) { + public CompletableFuture sendLocalMessage(final Object message, final boolean last, final Stream userids) { Object[] array = userids.toArray(); Serializable[] ss = new Serializable[array.length]; for (int i = 0; i < array.length; i++) { ss[i] = (Serializable) array[i]; } - return sendMessage(message, last, ss); + return WebSocketEngine.this.sendLocalMessage(message, last, ss); } @Comment("给指定用户组发送消息") - public CompletableFuture sendMessage(final Object message, final boolean last, final Serializable... userids) { + public CompletableFuture sendLocalMessage(final Object message, final boolean last, final Serializable... userids) { if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids)); + return ((CompletableFuture) message).thenCompose((json) -> WebSocketEngine.this.sendLocalMessage(json, last, userids)); } final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1; if (more) { @@ -326,7 +326,7 @@ public class WebSocketEngine { } @Comment("给指定WebSocket连接用户发起操作指令") - public CompletableFuture broadcastAction(final WebSocketAction action) { + public CompletableFuture broadcastLocalAction(final WebSocketAction action) { CompletableFuture future = null; if (single) { for (WebSocket websocket : websockets.values()) { @@ -343,17 +343,17 @@ public class WebSocketEngine { } @Comment("给指定用户组发送操作") - public CompletableFuture sendAction(final WebSocketAction action, final Stream userids) { + public CompletableFuture sendLocalAction(final WebSocketAction action, final Stream userids) { Object[] array = userids.toArray(); Serializable[] ss = new Serializable[array.length]; for (int i = 0; i < array.length; i++) { ss[i] = (Serializable) array[i]; } - return sendAction(action, ss); + return WebSocketEngine.this.sendLocalAction(action, ss); } @Comment("给指定用户组发送操作") - public CompletableFuture sendAction(final WebSocketAction action, final Serializable... userids) { + public CompletableFuture sendLocalAction(final WebSocketAction action, final Serializable... userids) { CompletableFuture future = null; if (single) { for (Serializable userid : userids) { diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 7d34d8a38..9fc8e39da 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -90,11 +90,11 @@ public abstract class WebSocketNode { protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid); - protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid); + protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids); protected abstract CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last); - protected abstract CompletableFuture sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable userid); + protected abstract CompletableFuture sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids); protected abstract CompletableFuture broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action); @@ -204,25 +204,6 @@ public abstract class WebSocketNode { return rs; } - /** - * @deprecated - * - * 判断指定用户是否WebSocket在线 - * - * @param userid Serializable - * - * @return boolean - */ - private CompletableFuture existsWebSocket2(final Serializable userid) { - if (this.localEngine != null && this.sncpNodeAddresses == null) { - return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid)); - } - tryAcquireSemaphore(); - CompletableFuture rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid); - if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); - return rs; - } - /** * 判断指定用户是否WebSocket在线 * @@ -429,15 +410,97 @@ public abstract class WebSocketNode { if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids)); final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 - return this.localEngine.sendMessage(message, last, userids); + return this.localEngine.sendLocalMessage(message, last, userids); } final Object remoteMessage = formatRemoteMessage(message); - CompletableFuture future = null; - for (Serializable userid : userids) { - future = future == null ? sendOneMessage(remoteMessage, last, userid) - : future.thenCombine(sendOneMessage(remoteMessage, last, userid), (a, b) -> a | b); + CompletableFuture rsfuture; + if (userids.length == 1) { + rsfuture = sendOneUserMessage(remoteMessage, last, userids[0]); + } else { + String[] keys = new String[userids.length]; + final Map keyuser = new HashMap<>(); + for (int i = 0; i < userids.length; i++) { + keys[i] = SOURCE_SNCP_USERID_PREFIX + userids[i]; + keyuser.put(keys[i], userids[i]); + } + tryAcquireSemaphore(); + CompletableFuture>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(InetSocketAddress.class, keys); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); + rsfuture = addrsFuture.thenCompose((Map> addrs) -> { + if (addrs == null || addrs.isEmpty()) { + if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node "); + return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + } + Map> addrUsers = new HashMap<>(); + addrs.forEach((key, as) -> { + for (InetSocketAddress a : as) { + addrUsers.computeIfAbsent(a, k -> new ArrayList<>()).add(keyuser.get(key)); + } + }); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found message-addr-userids: " + addrUsers); + } + CompletableFuture future = null; + for (Map.Entry> en : addrUsers.entrySet()) { + Serializable[] us = en.getValue().toArray(new Serializable[en.getValue().size()]); + future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, us) + : future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, us), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + }); } - return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + return rsfuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : rsfuture; + } + + protected CompletableFuture sendOneUserMessage(final Object message, final boolean last, final Serializable userid) { + if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid)); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("websocket want send message {userid:" + userid + ", content:'" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : JsonConvert.root().convertTo(message)) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + } + CompletableFuture localFuture = null; + if (this.localEngine != null) localFuture = localEngine.sendLocalMessage(message, last, userid); + if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; + } + //远程节点发送消息 + final Object remoteMessage = formatRemoteMessage(message); + tryAcquireSemaphore(); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + if (addrs == null || addrs.isEmpty()) { + if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node "); + return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + } + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs); + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + if (addr == null || addr.equals(localSncpAddress)) continue; + future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid) + : future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + }); + return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); + } + + protected CompletableFuture sendOneAddrMessage(final InetSocketAddress sncpAddr, final Object message, final boolean last, final Serializable... userids) { + if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneAddrMessage(sncpAddr, msg, last, userids)); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + sncpAddr + ", content:'" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : JsonConvert.root().convertTo(message)) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + } + if (Objects.equals(sncpAddr, this.localSncpAddress)) { + return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalMessage(message, last, userids); + } + if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + } + final Object remoteMessage = formatRemoteMessage(message); + return remoteNode.sendMessage(sncpAddr, remoteMessage, last, userids); } /** @@ -548,10 +611,10 @@ public abstract class WebSocketNode { if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(wsrange, convert, msg, last)); final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 - return this.localEngine.broadcastMessage(wsrange, message, last); + return this.localEngine.broadcastLocalMessage(wsrange, message, last); } final Object remoteMessage = formatRemoteMessage(message); - CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last); + CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last); tryAcquireSemaphore(); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); @@ -569,40 +632,6 @@ public abstract class WebSocketNode { return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } - protected CompletableFuture sendOneMessage(final Object message, final boolean last, final Serializable userid) { - if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneMessage(msg, last, userid)); - if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket want send message {userid:" + userid + ", content:'" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : JsonConvert.root().convertTo(message)) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); - } - CompletableFuture localFuture = null; - if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid); - if (this.sncpNodeAddresses == null || this.remoteNode == null) { - if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); - //没有CacheSource就不会有分布式节点 - return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; - } - //远程节点发送消息 - final Object remoteMessage = formatRemoteMessage(message); - tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); - if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); - CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { - if (addrs == null || addrs.isEmpty()) { - if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node "); - return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - } - if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs); - CompletableFuture future = null; - for (InetSocketAddress addr : addrs) { - if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid) - : future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b); - } - return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; - }); - return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); - } - /** * 广播操作, 给所有人发操作 * @@ -613,9 +642,9 @@ public abstract class WebSocketNode { @Local public CompletableFuture broadcastAction(final WebSocketAction action) { if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 - return this.localEngine.broadcastAction(action); + return this.localEngine.broadcastLocalAction(action); } - CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastAction(action); + CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalAction(action); tryAcquireSemaphore(); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); @@ -646,21 +675,53 @@ public abstract class WebSocketNode { public CompletableFuture sendAction(final WebSocketAction action, final Serializable... userids) { if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 - return this.localEngine.sendAction(action, userids); + return this.localEngine.sendLocalAction(action, userids); } - CompletableFuture future = null; - for (Serializable userid : userids) { - future = future == null ? sendOneAction(action, userid) : future.thenCombine(sendOneAction(action, userid), (a, b) -> a | b); + CompletableFuture rsfuture; + if (userids.length == 1) { + rsfuture = sendOneUserAction(action, userids[0]); + } else { + String[] keys = new String[userids.length]; + final Map keyuser = new HashMap<>(); + for (int i = 0; i < userids.length; i++) { + keys[i] = SOURCE_SNCP_USERID_PREFIX + userids[i]; + keyuser.put(keys[i], userids[i]); + } + tryAcquireSemaphore(); + CompletableFuture>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(InetSocketAddress.class, keys); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); + rsfuture = addrsFuture.thenCompose((Map> addrs) -> { + if (addrs == null || addrs.isEmpty()) { + if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node "); + return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + } + Map> addrUsers = new HashMap<>(); + addrs.forEach((key, as) -> { + for (InetSocketAddress a : as) { + addrUsers.computeIfAbsent(a, k -> new ArrayList<>()).add(keyuser.get(key)); + } + }); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found action-userid-addrs: " + addrUsers); + } + CompletableFuture future = null; + for (Map.Entry> en : addrUsers.entrySet()) { + Serializable[] us = en.getValue().toArray(new Serializable[en.getValue().size()]); + future = future == null ? sendOneAddrAction(en.getKey(), action, us) + : future.thenCombine(sendOneAddrAction(en.getKey(), action, us), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + }); } - return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + return rsfuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : rsfuture; } - protected CompletableFuture sendOneAction(final WebSocketAction action, final Serializable userid) { + protected CompletableFuture sendOneUserAction(final WebSocketAction action, final Serializable userid) { if (logger.isLoggable(Level.FINEST)) { logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); } CompletableFuture localFuture = null; - if (this.localEngine != null) localFuture = localEngine.sendAction(action, userid); + if (this.localEngine != null) localFuture = localEngine.sendLocalAction(action, userid); if (this.sncpNodeAddresses == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 @@ -687,6 +748,21 @@ public abstract class WebSocketNode { return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } + protected CompletableFuture sendOneAddrAction(final InetSocketAddress sncpAddr, final WebSocketAction action, final Serializable... userids) { + if (logger.isLoggable(Level.FINEST)) { + logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + sncpAddr + ", action:" + action + " from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + } + if (Objects.equals(sncpAddr, this.localSncpAddress)) { + return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalAction(action, userids); + } + if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + } + return remoteNode.sendAction(sncpAddr, action, userids); + } + protected Object formatRemoteMessage(Object message) { if (message instanceof WebSocketPacket) return message; if (message instanceof byte[]) return message; diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index fa748e466..62dbfe843 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -294,7 +294,7 @@ class WebSocketRunner implements Runnable { if (closed) return null; closed = true; channel.dispose(); - CompletableFuture future = engine.removeThenClose(webSocket); + CompletableFuture future = engine.removeLocalThenClose(webSocket); webSocket.onClose(code, reason); return future; } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index e264efed6..6da8ec642 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -255,7 +255,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl CompletableFuture rcFuture = webSocket.onSingleRepeatConnect(); Consumer task = (oldkilled) -> { if (oldkilled) { - WebSocketServlet.this.node.localEngine.add(webSocket); + WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); webSocket._runner = runner; context.runAsync(runner); @@ -276,7 +276,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl }); } } else { - WebSocketServlet.this.node.localEngine.add(webSocket); + WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); webSocket._runner = runner; context.runAsync(runner); @@ -284,7 +284,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } }); } else { - WebSocketServlet.this.node.localEngine.add(webSocket); + WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); webSocket._runner = runner; context.runAsync(runner); diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 5d68c85e1..eea5f28fd 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -56,27 +56,27 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid) { + public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.sendMessage(message, last, userid); + return this.localEngine.sendLocalMessage(message, last, userids); } @Override public CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.broadcastMessage(wsrange, message, last); + return this.localEngine.broadcastLocalMessage(wsrange, message, last); } @Override - public CompletableFuture sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable userid) { + public CompletableFuture sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.sendAction(action, userid); + return this.localEngine.sendLocalAction(action, userids); } @Override public CompletableFuture broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.broadcastAction(action); + return this.localEngine.broadcastLocalAction(action); } /**