From 636927e4eb51fb8941138f8c75a9098fa700ea9b Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 17 Jan 2024 22:57:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/http/RestMapping.java | 2 + .../org/redkale/net/http/RestOnMessage.java | 4 +- .../org/redkale/net/http/RestService.java | 2 + .../org/redkale/net/http/RestWebSocket.java | 4 +- .../java/org/redkale/net/http/WebSocket.java | 13 +- .../org/redkale/net/http/WebSocketEngine.java | 11 +- .../org/redkale/net/http/WebSocketNode.java | 130 ++++++++++++------ .../net/http/WebSocketNodeService.java | 29 ++-- .../net/http/WebSocketReadHandler.java | 12 +- .../redkale/net/http/WebSocketServlet.java | 13 +- .../net/http/WebSocketWriteHandler.java | 3 +- .../java/org/redkale/persistence/Sql.java | 13 ++ .../redkale/service/WebSocketNodeService.java | 6 +- 13 files changed, 168 insertions(+), 74 deletions(-) diff --git a/src/main/java/org/redkale/net/http/RestMapping.java b/src/main/java/org/redkale/net/http/RestMapping.java index d4ec0d766..527ec7979 100644 --- a/src/main/java/org/redkale/net/http/RestMapping.java +++ b/src/main/java/org/redkale/net/http/RestMapping.java @@ -14,6 +14,8 @@ import static java.lang.annotation.RetentionPolicy.*; * value默认为"/" + Service的类名去掉Service字样的小写字符串 (如HelloService,的默认路径为/hello)。
*

* 详情见: https://redkale.org + * + * @see org.redkale.net.http.RestService * * @author zhangjx */ diff --git a/src/main/java/org/redkale/net/http/RestOnMessage.java b/src/main/java/org/redkale/net/http/RestOnMessage.java index 214d0fae9..2016b13c6 100644 --- a/src/main/java/org/redkale/net/http/RestOnMessage.java +++ b/src/main/java/org/redkale/net/http/RestOnMessage.java @@ -10,7 +10,7 @@ import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.RetentionPolicy.RUNTIME; /** - * 标记在RestWebSocket的接收消息方法上;
+ * 标记在{@link org.redkale.net.http.RestWebSocket}的接收消息方法上;
* 注意:被标记的方法必须同时符合以下条件:
* 1、必须修饰为public * 2、不能修饰为final和static @@ -19,6 +19,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; * *

* 详情见: https://redkale.org + * + * @see org.redkale.net.http.RestWebSocket * * @author zhangjx */ diff --git a/src/main/java/org/redkale/net/http/RestService.java b/src/main/java/org/redkale/net/http/RestService.java index d6421ac62..1b9a0cbb3 100644 --- a/src/main/java/org/redkale/net/http/RestService.java +++ b/src/main/java/org/redkale/net/http/RestService.java @@ -13,6 +13,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; * 只能依附在Service类上,name默认为Service的类名小写并去掉Service字样及后面的字符串 (如HelloService/HelloServiceImpl,的默认路径为 hello)。 *

* 详情见: https://redkale.org + * + * @see org.redkale.net.http.RestMapping * * @author zhangjx */ diff --git a/src/main/java/org/redkale/net/http/RestWebSocket.java b/src/main/java/org/redkale/net/http/RestWebSocket.java index 34209adc9..c56ee3557 100644 --- a/src/main/java/org/redkale/net/http/RestWebSocket.java +++ b/src/main/java/org/redkale/net/http/RestWebSocket.java @@ -5,9 +5,9 @@ */ package org.redkale.net.http; +import java.lang.annotation.*; import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.RetentionPolicy.RUNTIME; -import java.lang.annotation.*; import org.redkale.net.Cryptor; /** @@ -16,6 +16,8 @@ import org.redkale.net.Cryptor; * name值支持含{system.property.xxx}模式 *

* 详情见: https://redkale.org + * + * @see org.redkale.net.http.RestOnMessage * * @author zhangjx */ diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index 948ea98ce..9fffaca51 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -225,7 +225,8 @@ public abstract class WebSocket { public final CompletableFuture send(Convert convert, Object message, boolean last) { final Convert c = convert == null ? getSendConvert() : convert; if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(json), last))); + return ((CompletableFuture) message).thenCompose(json + -> sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(json), last))); } return sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(message), last)); } @@ -326,7 +327,7 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示异常 */ public final CompletableFuture sendMessage(Object message, boolean last, G... userids) { - return sendMessage((Convert) null, message, last, userids); + return sendMessage((Convert) null, message, last, (Serializable[]) userids); } /** @@ -363,7 +364,7 @@ public abstract class WebSocket { return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); } if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(convert, json, last, userids)); + return ((CompletableFuture) message).thenCompose(json -> _engine.node.sendMessage(convert, json, last, userids)); } CompletableFuture rs = _engine.node.sendMessage(convert, message, last, userids); if (_engine.logger.isLoggable(Level.FINER)) { @@ -468,12 +469,12 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示部分发送异常 */ - public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message, final boolean last) { + public final CompletableFuture broadcastMessage(WebSocketRange wsrange, Convert convert, final Object message, final boolean last) { if (_engine.node == null) { return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); } if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(wsrange, convert, json, last)); + return ((CompletableFuture) message).thenCompose(json -> _engine.node.broadcastMessage(wsrange, convert, json, last)); } CompletableFuture rs = _engine.node.broadcastMessage(wsrange, convert, message, last); if (_engine.logger.isLoggable(Level.FINER)) { @@ -854,7 +855,7 @@ public abstract class WebSocket { * @return Future 可以为null, 为null或者Future值为false表示关闭新连接, Future值为true表示关闭旧连接 */ public CompletableFuture onSingleRepeatConnect() { - return forceCloseWebSocket(getUserid()).thenApply((r) -> true); + return forceCloseWebSocket(getUserid()).thenApply(r -> true); } /** diff --git a/src/main/java/org/redkale/net/http/WebSocketEngine.java b/src/main/java/org/redkale/net/http/WebSocketEngine.java index 93541f508..b0c0e2ec4 100644 --- a/src/main/java/org/redkale/net/http/WebSocketEngine.java +++ b/src/main/java/org/redkale/net/http/WebSocketEngine.java @@ -100,7 +100,8 @@ public class WebSocketEngine { if (conf != null && conf.getAnyValue("properties") != null) { props = conf.getAnyValue("properties"); } - this.liveInterval = props == null ? (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval) : props.getIntValue(WEBPARAM_LIVEINTERVAL, (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval)); + this.liveInterval = props == null ? (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval) + : props.getIntValue(WEBPARAM_LIVEINTERVAL, (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval)); if (liveInterval <= 0) { return; } @@ -121,19 +122,21 @@ public class WebSocketEngine { t.setDaemon(true); return t; }); - this.scheduler.setRemoveOnCancelPolicy(true); + this.scheduler.setRemoveOnCancelPolicy(true); long delay = (liveInterval - System.currentTimeMillis() / 1000 % liveInterval) + index * 5; final int intervalms = liveInterval * 1000; scheduler.scheduleWithFixedDelay(() -> { try { long now = System.currentTimeMillis(); - getLocalWebSockets().stream().filter(x -> ((now - x.getLastReadTime()) > intervalms && (now - x.getLastSendTime()) > intervalms)).forEach(x -> x.sendPing()); + getLocalWebSockets().stream().filter(x -> (now - Math.max(x.getLastReadTime(), x.getLastSendTime())) > intervalms) + .forEach(x -> x.sendPing()); } catch (Throwable t) { logger.log(Level.SEVERE, "WebSocketEngine schedule(interval=" + liveInterval + "s) ping error", t); } }, delay, liveInterval, TimeUnit.SECONDS); if (logger.isLoggable(Level.FINEST)) { - logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveInterval + "s) scheduler executor"); + logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveInterval + "s) scheduler executor"); } } diff --git a/src/main/java/org/redkale/net/http/WebSocketNode.java b/src/main/java/org/redkale/net/http/WebSocketNode.java index 323ea14b4..d155c239a 100644 --- a/src/main/java/org/redkale/net/http/WebSocketNode.java +++ b/src/main/java/org/redkale/net/http/WebSocketNode.java @@ -126,17 +126,23 @@ public abstract class WebSocketNode implements Service { } } - protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid); + protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid); - protected abstract CompletableFuture sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids); + protected abstract CompletableFuture sendMessage(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids); - protected abstract CompletableFuture broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last); + protected abstract CompletableFuture broadcastMessage(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last); - protected abstract CompletableFuture sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids); + protected abstract CompletableFuture sendAction(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids); - protected abstract CompletableFuture broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action); + protected abstract CompletableFuture broadcastAction(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action); - protected abstract CompletableFuture getUserSize(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress); + protected abstract CompletableFuture getUserSize(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress); protected abstract CompletableFuture connect(Serializable userid, WebSocketAddress wsaddr); @@ -144,28 +150,33 @@ public abstract class WebSocketNode implements Service { protected abstract CompletableFuture changeUserid(Serializable fromuserid, Serializable touserid, WebSocketAddress wsaddr); - protected abstract CompletableFuture existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress); + protected abstract CompletableFuture existsWebSocket(Serializable userid, + @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress); - protected abstract CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress); + protected abstract CompletableFuture forceCloseWebSocket(Serializable userid, + @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress); //-------------------------------------------------------------------------------- final CompletableFuture connect(final Serializable userid) { if (logger.isLoggable(Level.FINEST)) { - logger.finest(wsNodeAddress + " receive websocket connect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); + logger.finest(wsNodeAddress + " receive websocket connect event (" + userid + " on " + + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); } return connect(userid, wsNodeAddress); } final CompletableFuture disconnect(final Serializable userid) { if (logger.isLoggable(Level.FINEST)) { - logger.finest(wsNodeAddress + " receive websocket disconnect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); + logger.finest(wsNodeAddress + " receive websocket disconnect event (" + userid + " on " + + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); } return disconnect(userid, wsNodeAddress); } final CompletableFuture changeUserid(Serializable olduserid, final Serializable newuserid) { if (logger.isLoggable(Level.FINEST)) { - logger.finest(wsNodeAddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); + logger.finest(wsNodeAddress + " receive websocket changeUserid event (from " + olduserid + + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); } return changeUserid(olduserid, newuserid, wsNodeAddress); } @@ -185,7 +196,8 @@ public abstract class WebSocketNode implements Service { * * @return 客户端地址列表 */ - protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) { + protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) { if (remoteNode == null) { return CompletableFuture.completedFuture(null); } @@ -239,8 +251,9 @@ public abstract class WebSocketNode implements Service { } CompletableFuture>> future = null; for (final WebSocketAddress nodeAddress : addrs) { - CompletableFuture>> mapFuture = getWebSocketAddresses(nodeAddress.getTopic(), nodeAddress.getAddr(), userid) - .thenCompose((List list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list))); + CompletableFuture>> mapFuture + = getWebSocketAddresses(nodeAddress.getTopic(), nodeAddress.getAddr(), userid) + .thenCompose((List list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list))); future = future == null ? mapFuture : future.thenCombine(mapFuture, (a, b) -> Utility.merge(a, b)); } return future == null ? CompletableFuture.completedFuture(new HashMap<>()) : future; @@ -268,7 +281,8 @@ public abstract class WebSocketNode implements Service { if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); } - CompletableFuture localFuture = this.localEngine == null ? null : CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); + CompletableFuture localFuture = this.localEngine == null ? null + : CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); tryAcquireSemaphore(); CompletableFuture> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_NODES, WebSocketAddress.class); if (semaphore != null) { @@ -302,11 +316,13 @@ public abstract class WebSocketNode implements Service { */ public CompletableFuture> getUserSet() { if (this.localEngine != null && this.source == null) { - return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList()))); + return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet() + .stream().map(x -> String.valueOf(x)).collect(Collectors.toList()))); } tryAcquireSemaphore(); CompletableFuture> listFuture = this.source.keysStartsWithAsync(WS_SOURCE_KEY_USERID_PREFIX); - CompletableFuture> rs = listFuture.thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(WS_SOURCE_KEY_USERID_PREFIX.length())).collect(Collectors.toList()))); + CompletableFuture> rs = listFuture.thenApply(v -> new LinkedHashSet<>(v.stream() + .map(x -> x.substring(WS_SOURCE_KEY_USERID_PREFIX.length())).collect(Collectors.toList()))); if (semaphore != null) { rs.whenComplete((r, e) -> releaseSemaphore()); } @@ -374,7 +390,9 @@ public abstract class WebSocketNode implements Service { } if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null"); + logger.finest("websocket " + + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + + " node is null"); } //没有CacheSource就不会有分布式节点 return CompletableFuture.completedFuture(false); @@ -434,7 +452,9 @@ public abstract class WebSocketNode implements Service { } if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null"); + logger.finest("websocket " + + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + + " node is null"); } //没有CacheSource就不会有分布式节点 return localFuture == null ? CompletableFuture.completedFuture(0) : localFuture; @@ -564,7 +584,7 @@ public abstract class WebSocketNode implements Service { * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public final CompletableFuture sendMessage(final Object message, final boolean last, final Stream useridOrAddrs) { + public final CompletableFuture sendMessage(Object message, boolean last, Stream useridOrAddrs) { return sendMessage((Convert) null, message, last, useridOrAddrs); } @@ -595,7 +615,7 @@ public abstract class WebSocketNode implements Service { * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public final CompletableFuture sendMessage(final Convert convert, final Object message0, final boolean last, final Stream userids) { + public final CompletableFuture sendMessage(Convert convert, Object message0, boolean last, Stream userids) { Object[] array = userids.toArray(); Serializable[] ss = new Serializable[array.length]; for (int i = 0; i < array.length; i++) { @@ -630,7 +650,10 @@ public abstract class WebSocketNode implements Service { if (message0 instanceof CompletableFuture) { return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids)); } - final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(WebSocketPacket.FrameType.TEXT, ((TextConvert) convert).convertToBytes(message0), last) : new WebSocketPacket(WebSocketPacket.FrameType.BINARY, ((BinaryConvert) convert).convertToBytes(message0), last)); + final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 + : ((convert instanceof TextConvert) ? new WebSocketPacket(WebSocketPacket.FrameType.TEXT, + ((TextConvert) convert).convertToBytes(message0), last) : new WebSocketPacket(WebSocketPacket.FrameType.BINARY, + ((BinaryConvert) convert).convertToBytes(message0), last)); if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.sendLocalMessage(message, last, userids); } @@ -664,7 +687,8 @@ public abstract class WebSocketNode implements Service { } }); if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found message-addr-userids: " + addrUsers); + logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + + JsonConvert.root().convertTo(userids) + ") found message-addr-userids: " + addrUsers); } CompletableFuture future = null; for (Map.Entry> en : addrUsers.entrySet()) { @@ -690,14 +714,16 @@ public abstract class WebSocketNode implements Service { * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public CompletableFuture sendMessage(final Convert convert, final Object message0, final boolean last, final WebSocketUserAddress... useraddrs) { + public CompletableFuture sendMessage(Convert convert, Object message0, boolean last, WebSocketUserAddress... useraddrs) { if (useraddrs == null || useraddrs.length < 1) { return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } if (message0 instanceof CompletableFuture) { return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, useraddrs)); } - final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); + final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 + : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) + : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.sendLocalMessage(message, last, userAddressToUserids(useraddrs)); } @@ -705,7 +731,8 @@ public abstract class WebSocketNode implements Service { final Object remoteMessage = formatRemoteMessage(message); final Map> addrUsers = userAddressToAddrMap(useraddrs); if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found message-addr-userids: " + addrUsers); + logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + + ") found message-addr-userids: " + addrUsers); } CompletableFuture future = null; for (Map.Entry> en : addrUsers.entrySet()) { @@ -722,7 +749,10 @@ public abstract class WebSocketNode implements Service { return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid)); } if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket want send message {userid:" + userid + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + logger.finest("websocket want send message {userid:" + userid + ", content:" + + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() + : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); } CompletableFuture localFuture = null; if (this.localEngine != null) { @@ -730,7 +760,9 @@ public abstract class WebSocketNode implements Service { } if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null"); + logger.finest("websocket " + + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + + " node is null"); } //没有CacheSource就不会有分布式节点 return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; @@ -770,14 +802,21 @@ public abstract class WebSocketNode implements Service { return ((CompletableFuture) message).thenApply(msg -> sendOneAddrMessage(addr, msg, last, userids)); } if (logger.isLoggable(Level.FINEST) && this.localEngine == null) { //只打印远程模式的 - logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + addr + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + + ", sncpaddr:" + addr + ", content:" + + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() + : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); } if (Objects.equals(addr, this.wsNodeAddress)) { - return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalMessage(message, last, userids); + return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) + : localEngine.sendLocalMessage(message, last, userids); } if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null"); + logger.finest("websocket " + + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + + " node is null"); } //没有CacheSource就不会有分布式节点 return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); @@ -918,11 +957,13 @@ public abstract class WebSocketNode implements Service { * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message0, final boolean last) { + public CompletableFuture broadcastMessage(WebSocketRange wsrange, Convert convert, Object message0, final boolean last) { if (message0 instanceof CompletableFuture) { return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(wsrange, convert, msg, last)); } - final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); + final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 + : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) + : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.broadcastLocalMessage(wsrange, message, last); } @@ -1044,7 +1085,8 @@ public abstract class WebSocketNode implements Service { } }); if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found action-userid-addrs: " + addrUsers); + logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + + ") found action-userid-addrs: " + addrUsers); } CompletableFuture future = null; for (Map.Entry> en : addrUsers.entrySet()) { @@ -1078,7 +1120,8 @@ public abstract class WebSocketNode implements Service { final Map> addrUsers = userAddressToAddrMap(useraddrs); if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found action-userid-addrs: " + addrUsers); + logger.finest("websocket(localaddr=" + localSncpAddress + + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found action-userid-addrs: " + addrUsers); } CompletableFuture future = null; for (Map.Entry> en : addrUsers.entrySet()) { @@ -1091,7 +1134,8 @@ public abstract class WebSocketNode implements Service { protected CompletableFuture sendOneUserAction(final WebSocketAction action, final Serializable userid) { if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + logger.finest("websocket want send action {userid:" + userid + ", action:" + action + + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); } CompletableFuture localFuture = null; if (this.localEngine != null) { @@ -1099,7 +1143,9 @@ public abstract class WebSocketNode implements Service { } if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null"); + logger.finest("websocket " + + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + + " node is null"); } //没有CacheSource就不会有分布式节点 return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; @@ -1133,16 +1179,20 @@ public abstract class WebSocketNode implements Service { return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } - protected CompletableFuture sendOneAddrAction(final WebSocketAddress addr, final WebSocketAction action, final Serializable... userids) { + protected CompletableFuture sendOneAddrAction(WebSocketAddress addr, WebSocketAction action, Serializable... userids) { if (logger.isLoggable(Level.FINEST) && this.localEngine == null) { //只打印远程模式的 - logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + addr + ", action:" + action + " from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + + ", sncpaddr:" + addr + ", action:" + action + " from locale node to " + + ((this.localEngine != null) ? "locale" : "remote") + " engine"); } if (Objects.equals(addr, this.wsNodeAddress)) { return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalAction(action, userids); } if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) { - logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null"); + logger.finest("websocket " + + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + + " node is null"); } //没有CacheSource就不会有分布式节点 return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); diff --git a/src/main/java/org/redkale/net/http/WebSocketNodeService.java b/src/main/java/org/redkale/net/http/WebSocketNodeService.java index affaec629..15342b0e9 100644 --- a/src/main/java/org/redkale/net/http/WebSocketNodeService.java +++ b/src/main/java/org/redkale/net/http/WebSocketNodeService.java @@ -7,8 +7,9 @@ import java.util.concurrent.CompletableFuture; import java.util.logging.Level; import org.redkale.annotation.*; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; -import org.redkale.net.http.WebSocketNodeService; -import org.redkale.service.*; +import org.redkale.service.RpcTargetAddress; +import org.redkale.service.RpcTargetTopic; +import org.redkale.service.Service; import org.redkale.util.AnyValue; /** @@ -37,8 +38,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { - if ((topic == null || !topic.equals(this.wsNodeAddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) { + public CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, + final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { + if ((topic == null || !topic.equals(this.wsNodeAddress.getTopic())) + && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) { return remoteWebSocketAddresses(topic, targetAddress, groupid); } if (this.localEngine == null) { @@ -50,7 +53,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) { + public CompletableFuture sendMessage(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) { if (this.localEngine == null) { return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } @@ -58,7 +62,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) { + public CompletableFuture broadcastMessage(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) { if (this.localEngine == null) { return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } @@ -66,7 +71,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) { + public CompletableFuture sendAction(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) { if (this.localEngine == null) { return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } @@ -74,7 +80,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) { + public CompletableFuture broadcastAction(@RpcTargetTopic String topic, + @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) { if (this.localEngine == null) { return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } @@ -164,7 +171,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { * @return 无返回值 */ @Override - public CompletableFuture existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { + public CompletableFuture existsWebSocket(Serializable userid, + @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { if (logger.isLoggable(Level.FINEST)) { logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress); } @@ -184,7 +192,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { * @return 无返回值 */ @Override - public CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { + public CompletableFuture forceCloseWebSocket(Serializable userid, + @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { //不能从sncpNodeAddresses中移除,因为engine.forceCloseWebSocket 会调用到disconnect if (logger.isLoggable(Level.FINEST)) { logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress); diff --git a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java index c3d83ec38..a886f2f26 100644 --- a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java @@ -54,7 +54,8 @@ public class WebSocketReadHandler implements CompletionHandler byteArrayPool, BiConsumer messageConsumer) { + public WebSocketReadHandler(HttpContext context, WebSocket webSocket, ObjectPool byteArrayPool, + BiConsumer messageConsumer) { this.context = context; this.webSocket = webSocket; this.byteArrayPool = byteArrayPool; @@ -299,7 +300,8 @@ public class WebSocketReadHandler implements CompletionHandler * 详情见: https://redkale.org * + * @see org.redkale.net.http.WebSocketNodeService * @deprecated 2.6.0 * @author zhangjx */ @@ -27,6 +28,7 @@ public class WebSocketNodeService extends org.redkale.net.http.WebSocketNodeServ @Override public void init(AnyValue conf) { super.init(conf); - logger.log(Level.WARNING, WebSocketNodeService.class.getName() + "is replaced by " + org.redkale.net.http.WebSocketNodeService.class.getName()); + logger.log(Level.WARNING, WebSocketNodeService.class.getName() + + "is replaced by " + org.redkale.net.http.WebSocketNodeService.class.getName()); } }