From e2057676bdf3528d2cb547d820be90f6118791f5 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Tue, 2 Jan 2018 16:42:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96WebSocketNode.sendMessage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocketNode.java | 16 +++++++++------- src/org/redkale/net/http/WebSocketServlet.java | 1 + 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 49e04888b..e9dd690fd 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -48,8 +48,8 @@ public abstract class WebSocketNode { @RpcRemote protected WebSocketNode remoteNode; - @Resource(name = "$_textconvert") - protected Convert textConvert; + @Resource(name = "$_sendconvert") + protected Convert sendConvert; //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid //集合包含 localSncpAddress @@ -354,10 +354,11 @@ public abstract class WebSocketNode { if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 return this.localEngine.sendMessage(message, last, userids); } + final Object remoteMessage = formatRemoteMessage(message0); CompletableFuture future = null; for (Serializable userid : userids) { - future = future == null ? sendOneMessage(message, last, userid) - : future.thenCombine(sendOneMessage(message, last, userid), (a, b) -> a | b); + future = future == null ? sendOneMessage(remoteMessage, last, userid) + : future.thenCombine(sendOneMessage(remoteMessage, last, userid), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } @@ -412,13 +413,13 @@ public abstract class WebSocketNode { if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 return this.localEngine.broadcastMessage(message, last); } + final Object remoteMessage = formatRemoteMessage(message); CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(message, last); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); CompletableFuture future = null; - Object remoteMessage = formatRemoteMessage(message); for (InetSocketAddress addr : addrs) { if (addr == null || addr.equals(localSncpAddress)) continue; future = future == null ? remoteNode.broadcastMessage(addr, remoteMessage, last) @@ -440,6 +441,7 @@ public abstract class WebSocketNode { return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; } //远程节点发送消息 + final Object remoteMessage = formatRemoteMessage(message); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (addrs == null || addrs.isEmpty()) { @@ -448,7 +450,6 @@ public abstract class WebSocketNode { } if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs); CompletableFuture future = null; - Object remoteMessage = formatRemoteMessage(message); for (InetSocketAddress addr : addrs) { if (addr == null || addr.equals(localSncpAddress)) continue; future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid) @@ -463,7 +464,8 @@ public abstract class WebSocketNode { if (message instanceof WebSocketPacket) return message; if (message instanceof byte[]) return message; if (message instanceof CharSequence) return message; - if (textConvert != null) return ((TextConvert) textConvert).convertTo(message); + if (sendConvert instanceof TextConvert) ((TextConvert) sendConvert).convertTo(message); + if (sendConvert instanceof BinaryConvert) ((BinaryConvert) sendConvert).convertTo(message); return JsonConvert.root().convertTo(message); } } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 6e0a2ec06..bcc13b401 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -123,6 +123,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl this.node = new WebSocketNodeService(); if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } + if (this.node.sendConvert == null) this.node.sendConvert = this.sendConvert; //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",