WebSocket模块broadcastMessage方法增加WebSocketRange过滤参数

This commit is contained in:
Redkale
2018-01-03 11:17:00 +08:00
parent c700ffbadc
commit 808a3c2e9c
5 changed files with 173 additions and 11 deletions

View File

@@ -369,6 +369,18 @@ public abstract class WebSocket<G extends Serializable, T> {
return broadcastMessage((Convert) null, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param wsrange 过滤条件
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Object message) {
return broadcastMessage((WebSocketRange) null, (Convert) null, message, true);
}
/**
* 广播消息, 给所有人发消息
*
@@ -381,6 +393,19 @@ public abstract class WebSocket<G extends Serializable, T> {
return broadcastMessage(convert, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param wsrange 过滤条件
* @param convert Convert
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message) {
return broadcastMessage((WebSocketRange) null, convert, message, true);
}
/**
* 广播消息, 给所有人发消息
*
@@ -393,6 +418,19 @@ public abstract class WebSocket<G extends Serializable, T> {
return broadcastMessage((Convert) null, message, last);
}
/**
* 广播消息, 给所有人发消息
*
* @param wsrange 过滤条件
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Object message, final boolean last) {
return broadcastMessage(wsrange, (Convert) null, message, last);
}
/**
* 广播消息, 给所有人发消息
*
@@ -403,11 +441,25 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final Convert convert, final Object message, final boolean last) {
return broadcastMessage((WebSocketRange) null, convert, message, last);
}
/**
* 广播消息, 给所有人发消息
*
* @param wsrange 过滤条件
* @param convert Convert
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message, final boolean last) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(convert, json, last));
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(wsrange, convert, json, last));
}
CompletableFuture<Integer> rs = _engine.node.broadcastMessage(convert, message, last);
CompletableFuture<Integer> rs = _engine.node.broadcastMessage(wsrange, convert, message, last);
if (_engine.logger.isLoggable(Level.FINEST)) _engine.logger.finest("broadcast send websocket message(" + message + ")");
return rs;
}
@@ -609,6 +661,17 @@ public abstract class WebSocket<G extends Serializable, T> {
*/
protected abstract CompletableFuture<G> createUserid();
/**
* WebSocket.broadcastMessage时的过滤条件
*
* @param wsrange 过滤条件
*
* @return boolean
*/
protected boolean predicate(WebSocketRange wsrange) {
return true;
}
/**
* WebSokcet连接成功后的回调方法
*/

View File

@@ -193,7 +193,13 @@ public class WebSocketEngine {
@Comment("给所有连接用户发送消息")
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
return broadcastMessage(null, message, last);
return broadcastMessage((Predicate) null, message, last);
}
@Comment("给指定WebSocket连接用户发送消息")
public CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Object message, final boolean last) {
Predicate<WebSocket> predicate = wsrange == null ? null : (ws) -> ws.predicate(wsrange);
return broadcastMessage(predicate, message, last);
}
@Comment("给指定WebSocket连接用户发送消息")

View File

@@ -80,7 +80,7 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid);
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last);
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
protected abstract CompletableFuture<Void> connect(Serializable userid, InetSocketAddress addr);
@@ -374,6 +374,18 @@ public abstract class WebSocketNode {
return broadcastMessage((Convert) null, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param wsrange 过滤条件
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Object message) {
return broadcastMessage(wsrange, (Convert) null, message, true);
}
/**
* 广播消息, 给所有人发消息
*
@@ -386,6 +398,19 @@ public abstract class WebSocketNode {
return broadcastMessage(convert, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param wsrange 过滤条件
* @param convert Convert
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message) {
return broadcastMessage(wsrange, convert, message, true);
}
/**
* 广播消息, 给所有人发消息
*
@@ -398,6 +423,19 @@ public abstract class WebSocketNode {
return broadcastMessage((Convert) null, message, last);
}
/**
* 广播消息, 给所有人发消息
*
* @param wsrange 过滤条件
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Object message, final boolean last) {
return broadcastMessage(wsrange, (Convert) null, message, last);
}
/**
* 广播消息, 给所有人发消息
*
@@ -408,13 +446,27 @@ public abstract class WebSocketNode {
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final Convert convert, final Object message0, final boolean last) {
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(convert, msg, last));
return broadcastMessage((WebSocketRange) null, convert, message0, last);
}
/**
* 广播消息, 给所有人发消息
*
* @param wsrange 过滤条件
* @param convert Convert
* @param message0 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message0, final boolean last) {
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(wsrange, convert, msg, last));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.broadcastMessage(message, last);
return this.localEngine.broadcastMessage(wsrange, message, last);
}
final Object remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(message, last);
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY);
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message on " + addrs);
@@ -422,8 +474,8 @@ public abstract class WebSocketNode {
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.broadcastMessage(addr, remoteMessage, last)
: future.thenCombine(remoteNode.broadcastMessage(addr, remoteMessage, last), (a, b) -> a | b);
future = future == null ? remoteNode.broadcastMessage(addr, wsrange, remoteMessage, last)
: future.thenCombine(remoteNode.broadcastMessage(addr, wsrange, remoteMessage, last), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(0) : future;
});

View File

@@ -0,0 +1,41 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.http;
import java.io.Serializable;
import java.util.Map;
/**
* WebSocket.broadcastMessage时的过滤条件
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class WebSocketRange implements Serializable {
protected String wskey;
protected Map<String, String> attach;
public String getWskey() {
return wskey;
}
public void setWskey(String wskey) {
this.wskey = wskey;
}
public Map<String, String> getAttach() {
return attach;
}
public void setAttach(Map<String, String> attach) {
this.attach = attach;
}
}

View File

@@ -53,9 +53,9 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress addr, Object message, boolean last) {
public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress addr, final WebSocketRange wsrange, Object message, boolean last) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastMessage(message, last);
return this.localEngine.broadcastMessage(wsrange, message, last);
}
/**