修复WebSocketNode在远程模式下使用SNCP序列化导致目标服务器没有对应的JavaBean类而无法解析的BUG
This commit is contained in:
@@ -15,6 +15,7 @@ import java.util.stream.Stream;
|
|||||||
import javax.annotation.*;
|
import javax.annotation.*;
|
||||||
import org.redkale.boot.*;
|
import org.redkale.boot.*;
|
||||||
import org.redkale.convert.*;
|
import org.redkale.convert.*;
|
||||||
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.service.*;
|
import org.redkale.service.*;
|
||||||
import org.redkale.source.*;
|
import org.redkale.source.*;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
@@ -44,6 +45,9 @@ public abstract class WebSocketNode {
|
|||||||
@RpcRemote
|
@RpcRemote
|
||||||
protected WebSocketNode remoteNode;
|
protected WebSocketNode remoteNode;
|
||||||
|
|
||||||
|
@Resource(name = "$_textconvert")
|
||||||
|
protected Convert textConvert;
|
||||||
|
|
||||||
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid
|
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid
|
||||||
//集合包含 localSncpAddress
|
//集合包含 localSncpAddress
|
||||||
//如果不是分布式(没有SNCP),sncpNodeAddresses 将不会被用到
|
//如果不是分布式(没有SNCP),sncpNodeAddresses 将不会被用到
|
||||||
@@ -403,10 +407,11 @@ public abstract class WebSocketNode {
|
|||||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message on " + addrs);
|
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message on " + addrs);
|
||||||
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
||||||
CompletableFuture<Integer> future = null;
|
CompletableFuture<Integer> future = null;
|
||||||
|
Object remoteMessage = formatRemoteMessage(message);
|
||||||
for (InetSocketAddress addr : addrs) {
|
for (InetSocketAddress addr : addrs) {
|
||||||
if (addr == null || addr.equals(localSncpAddress)) continue;
|
if (addr == null || addr.equals(localSncpAddress)) continue;
|
||||||
future = future == null ? remoteNode.broadcastMessage(addr, message, last)
|
future = future == null ? remoteNode.broadcastMessage(addr, remoteMessage, last)
|
||||||
: future.thenCombine(remoteNode.broadcastMessage(addr, message, last), (a, b) -> a | b);
|
: future.thenCombine(remoteNode.broadcastMessage(addr, remoteMessage, last), (a, b) -> a | b);
|
||||||
}
|
}
|
||||||
return future == null ? CompletableFuture.completedFuture(0) : future;
|
return future == null ? CompletableFuture.completedFuture(0) : future;
|
||||||
});
|
});
|
||||||
@@ -432,13 +437,22 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
|
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
|
||||||
CompletableFuture<Integer> future = null;
|
CompletableFuture<Integer> future = null;
|
||||||
|
Object remoteMessage = formatRemoteMessage(message);
|
||||||
for (InetSocketAddress addr : addrs) {
|
for (InetSocketAddress addr : addrs) {
|
||||||
if (addr == null || addr.equals(localSncpAddress)) continue;
|
if (addr == null || addr.equals(localSncpAddress)) continue;
|
||||||
future = future == null ? remoteNode.sendMessage(addr, message, last, userid)
|
future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid)
|
||||||
: future.thenCombine(remoteNode.sendMessage(addr, message, last, userid), (a, b) -> a | b);
|
: future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b);
|
||||||
}
|
}
|
||||||
return future == null ? CompletableFuture.completedFuture(0) : future;
|
return future == null ? CompletableFuture.completedFuture(0) : future;
|
||||||
});
|
});
|
||||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Object formatRemoteMessage(Object message) {
|
||||||
|
if (message instanceof WebSocketPacket) return message;
|
||||||
|
if (message instanceof byte[]) return message;
|
||||||
|
if (message instanceof CharSequence) return message;
|
||||||
|
if (textConvert != null) return ((TextConvert) textConvert).convertTo(message);
|
||||||
|
return JsonConvert.root().convertTo(message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user