优化WebSocketNode.sendMessage

This commit is contained in:
Redkale
2018-01-02 16:42:51 +08:00
parent f7dfb32849
commit e2057676bd
2 changed files with 10 additions and 7 deletions

View File

@@ -48,8 +48,8 @@ public abstract class WebSocketNode {
@RpcRemote
protected WebSocketNode remoteNode;
@Resource(name = "$_textconvert")
protected Convert textConvert;
@Resource(name = "$_sendconvert")
protected Convert sendConvert;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid
//集合包含 localSncpAddress
@@ -354,10 +354,11 @@ public abstract class WebSocketNode {
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.sendMessage(message, last, userids);
}
final Object remoteMessage = formatRemoteMessage(message0);
CompletableFuture<Integer> future = null;
for (Serializable userid : userids) {
future = future == null ? sendOneMessage(message, last, userid)
: future.thenCombine(sendOneMessage(message, last, userid), (a, b) -> a | b);
future = future == null ? sendOneMessage(remoteMessage, last, userid)
: future.thenCombine(sendOneMessage(remoteMessage, last, userid), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}
@@ -412,13 +413,13 @@ public abstract class WebSocketNode {
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.broadcastMessage(message, last);
}
final Object remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(message, last);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY);
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
CompletableFuture<Integer> future = null;
Object remoteMessage = formatRemoteMessage(message);
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.broadcastMessage(addr, remoteMessage, last)
@@ -440,6 +441,7 @@ public abstract class WebSocketNode {
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
}
//远程节点发送消息
final Object remoteMessage = formatRemoteMessage(message);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
@@ -448,7 +450,6 @@ public abstract class WebSocketNode {
}
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
CompletableFuture<Integer> future = null;
Object remoteMessage = formatRemoteMessage(message);
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid)
@@ -463,7 +464,8 @@ public abstract class WebSocketNode {
if (message instanceof WebSocketPacket) return message;
if (message instanceof byte[]) return message;
if (message instanceof CharSequence) return message;
if (textConvert != null) return ((TextConvert) textConvert).convertTo(message);
if (sendConvert instanceof TextConvert) ((TextConvert) sendConvert).convertTo(message);
if (sendConvert instanceof BinaryConvert) ((BinaryConvert) sendConvert).convertTo(message);
return JsonConvert.root().convertTo(message);
}
}

View File

@@ -123,6 +123,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
this.node = new WebSocketNodeService();
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
if (this.node.sendConvert == null) this.node.sendConvert = this.sendConvert;
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",