From 71ead72dec9f44b89ea3bb6798825d83f5fc7283 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 13 Nov 2017 17:22:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DWebSocketNode=E5=9C=A8?= =?UTF-8?q?=E8=BF=9C=E7=A8=8B=E6=A8=A1=E5=BC=8F=E4=B8=8B=E4=BD=BF=E7=94=A8?= =?UTF-8?q?SNCP=E5=BA=8F=E5=88=97=E5=8C=96=E5=AF=BC=E8=87=B4=E7=9B=AE?= =?UTF-8?q?=E6=A0=87=E6=9C=8D=E5=8A=A1=E5=99=A8=E6=B2=A1=E6=9C=89=E5=AF=B9?= =?UTF-8?q?=E5=BA=94=E7=9A=84JavaBean=E7=B1=BB=E8=80=8C=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocketNode.java | 22 +++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 34d52e076..dfda85bfe 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -15,6 +15,7 @@ import java.util.stream.Stream; import javax.annotation.*; import org.redkale.boot.*; import org.redkale.convert.*; +import org.redkale.convert.json.JsonConvert; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -44,6 +45,9 @@ public abstract class WebSocketNode { @RpcRemote protected WebSocketNode remoteNode; + @Resource(name = "$_textconvert") + protected Convert textConvert; + //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid //集合包含 localSncpAddress //如果不是分布式(没有SNCP),sncpNodeAddresses 将不会被用到 @@ -403,10 +407,11 @@ public abstract class WebSocketNode { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); CompletableFuture future = null; + Object remoteMessage = formatRemoteMessage(message); for (InetSocketAddress addr : addrs) { if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.broadcastMessage(addr, message, last) - : future.thenCombine(remoteNode.broadcastMessage(addr, message, last), (a, b) -> a | b); + future = future == null ? remoteNode.broadcastMessage(addr, remoteMessage, last) + : future.thenCombine(remoteNode.broadcastMessage(addr, remoteMessage, last), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); @@ -432,13 +437,22 @@ public abstract class WebSocketNode { } if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs); CompletableFuture future = null; + Object remoteMessage = formatRemoteMessage(message); for (InetSocketAddress addr : addrs) { if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.sendMessage(addr, message, last, userid) - : future.thenCombine(remoteNode.sendMessage(addr, message, last, userid), (a, b) -> a | b); + future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid) + : future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } + + private Object formatRemoteMessage(Object message) { + if (message instanceof WebSocketPacket) return message; + if (message instanceof byte[]) return message; + if (message instanceof CharSequence) return message; + if (textConvert != null) return ((TextConvert) textConvert).convertTo(message); + return JsonConvert.root().convertTo(message); + } }