This commit is contained in:
Redkale
2017-05-26 11:15:04 +08:00
parent ceeb924d4d
commit c68d988d51
5 changed files with 159 additions and 5 deletions

View File

@@ -262,6 +262,59 @@ public abstract class WebSocket<G extends Serializable, T> {
return rs;
}
/**
* 广播消息, 给所有人的所有接入的WebSocket节点发消息
*
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastEachMessage(final Object message) {
return broadcastMessage(false, message, true);
}
/**
* 广播消息, 给所有人最近接入的WebSocket节点发消息
*
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastRecentMessage(final Object message) {
return broadcastMessage(true, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message) {
return broadcastMessage(recent, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message, final boolean last) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(recent, json, last));
}
CompletableFuture<Integer> rs = _engine.node.broadcastMessage(recent, message, last);
if (_engine.finest) _engine.logger.finest("broadcast " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")");
return rs;
}
/**
* 获取当前WebSocket下的属性非线程安全
*

View File

@@ -107,11 +107,39 @@ public final class WebSocketEngine {
}
}
public CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message, final boolean last) {
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(recent, json, last));
}
final Collection<WebSocketGroup> groups = getWebSocketGroups();
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && groups.size() > 1;
if (more) {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
CompletableFuture<Integer> future = null;
for (WebSocketGroup group : groups) {
if (group == null) continue;
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b);
}
if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else {
CompletableFuture<Integer> future = null;
for (WebSocketGroup group : groups) {
if (group == null) continue;
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}
}
CompletableFuture<Integer> sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) {
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(recent, json, last, groupids));
}
final boolean more = !(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null || groupids.length > 1;
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && groupids.length > 1;
if (more) {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])

View File

@@ -41,7 +41,7 @@ public abstract class WebSocketNode {
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid
//集合包含 localSncpAddress
//如果不是分布式(没有SNCP)sncpNodeAddresses 将不会被用到
@Resource(name = "$")
@Resource(name = "$_nodes")
protected CacheSource<Serializable, InetSocketAddress> sncpNodeAddresses;
//当前节点的本地WebSocketEngine
@@ -57,12 +57,15 @@ public abstract class WebSocketNode {
if (this.localEngine == null) return;
//关掉所有本地本地WebSocket
this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid()));
if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress);
}
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last, Serializable groupid);
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last);
protected abstract CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress addr);
protected abstract CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress addr);
@@ -167,7 +170,7 @@ public abstract class WebSocketNode {
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
* @return 为0表示成功 其他值表示部分发送异常
*/
//最近连接发送逻辑还没有理清楚
public final CompletableFuture<Integer> sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) {
@@ -183,6 +186,69 @@ public abstract class WebSocketNode {
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}
/**
* 广播消息, 给所有人的所有接入的WebSocket节点发消息
*
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastEachMessage(final Object message) {
return broadcastMessage(false, message, true);
}
/**
* 广播消息, 给所有人最近接入的WebSocket节点发消息
*
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastRecentMessage(final Object message) {
return broadcastMessage(true, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message) {
return broadcastMessage(recent, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message, final boolean last) {
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.broadcastMessage(recent, message, last);
}
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(recent, message, last);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync("redkale_sncpnodes");
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (finest) logger.finest("websocket broadcast message on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
future = future == null ? remoteNode.broadcastMessage(addr, recent, message, last)
: future.thenCombine(remoteNode.broadcastMessage(addr, recent, message, last), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(0) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
private CompletableFuture<Integer> sendOneMessage(final boolean recent, final Object message, final boolean last, final Serializable groupid) {
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine");
CompletableFuture<Integer> localFuture = null;

View File

@@ -57,6 +57,12 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
return group.send(recent, message, last);
}
@Override
public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress addr, boolean recent, Object message, boolean last) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastMessage(recent, message, last);
}
/**
* 当用户连接到节点需要更新到CacheSource
*
@@ -68,6 +74,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(groupid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr));
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + sncpAddr);
return future;
}

View File

@@ -30,8 +30,8 @@ public class ChatWebSocket extends WebSocket<Integer, Object> {
@RestOnMessage(name = "sendmessage")
public void onChatMessage(ChatMessage message, Map<String, String> extmap) {
System.out.println("获取消息: message: " + message + ", map: " + extmap);
super.send(message);
System.out.println("获取消息: message: " + message + ", map: " + extmap);
super.broadcastEachMessage(message); //给所有人广播
}
@RestOnMessage(name = "joinroom")