This commit is contained in:
Redkale
2017-05-22 19:22:24 +08:00
parent 205162ce38
commit fda9c30dc4
6 changed files with 99 additions and 61 deletions

View File

@@ -36,9 +36,10 @@ import org.redkale.util.Comment;
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <T> 泛型
* @param <G> Groupid的泛型
* @param <T> Message的泛型
*/
public abstract class WebSocket<T> {
public abstract class WebSocket<G extends Serializable, T> {
@Comment("消息不合法")
public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2
@@ -72,7 +73,7 @@ public abstract class WebSocket<T> {
Serializable _sessionid; //不可能为空
Serializable _groupid; //不可能为空
G _groupid; //不可能为空
SocketAddress _remoteAddress;//不可能为空
@@ -129,6 +130,15 @@ public abstract class WebSocket<T> {
* @return 0表示成功 非0表示错误码
*/
public final CompletableFuture<Integer> send(Object message, boolean last) {
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> {
if (json == null || json instanceof CharSequence || json instanceof byte[]) {
return sendPacket(new WebSocketPacket((Serializable) json, last));
} else {
return sendPacket(new WebSocketPacket(_jsonConvert, json, last));
}
});
}
if (message == null || message instanceof CharSequence || message instanceof byte[]) {
return sendPacket(new WebSocketPacket((Serializable) message, last));
} else {
@@ -145,7 +155,7 @@ public abstract class WebSocket<T> {
* @return 0表示成功 非0表示错误码
*/
public final CompletableFuture<Integer> send(JsonConvert convert, Object message) {
return sendPacket(new WebSocketPacket(convert == null ? _jsonConvert : convert, message, true));
return send(convert, message, true);
}
/**
@@ -158,6 +168,9 @@ public abstract class WebSocket<T> {
* @return 0表示成功 非0表示错误码
*/
public final CompletableFuture<Integer> send(JsonConvert convert, Object message, boolean last) {
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(convert == null ? _jsonConvert : convert, json, last)));
}
return sendPacket(new WebSocketPacket(convert == null ? _jsonConvert : convert, message, last));
}
@@ -178,67 +191,70 @@ public abstract class WebSocket<T> {
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param message 不可为空
* @param message 不可为空
* @param groupids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message) {
return sendEachMessage(groupid, message, true);
public final CompletableFuture<Integer> sendEachMessage(Object message, G... groupids) {
return sendEachMessage(message, true, groupids);
}
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param message 不可为空
* @param last 是否最后一条
* @param message 不可为空
* @param last 是否最后一条
* @param groupids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, false, message, last);
public final CompletableFuture<Integer> sendEachMessage(Object message, boolean last, G... groupids) {
return sendMessage(false, message, last, groupids);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param message 不可为空
* @param message 不可为空
* @param groupids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message) {
return sendMessage(groupid, true, message, true);
public final CompletableFuture<Integer> sendRecentMessage(Object message, G... groupids) {
return sendMessage(true, message, true, groupids);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param message 不可为空
* @param last 是否最后一条
* @param groupids Serializable[]
* @param message 不可为空
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, true, message, last);
public final CompletableFuture<Integer> sendRecentMessage(Object message, boolean last, G... groupids) {
return sendMessage(true, message, last, groupids);
}
/**
* 给指定groupid的WebSocketGroup下WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param recent 是否只发最近接入的WebSocket
* @param message 不可为空
* @param last 是否最后一条
* @param recent 是否只发最近接入的WebSocket
* @param message 不可为空
* @param last 是否最后一条
* @param groupids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message, boolean last) {
public final CompletableFuture<Integer> sendMessage(boolean recent, Object message, boolean last, G... groupids) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
CompletableFuture<Integer> rs = _engine.node.sendMessage(groupid, recent, message, last);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")");
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(recent, json, last, groupids));
}
CompletableFuture<Integer> rs = _engine.node.sendMessage(recent, message, last, groupids);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + Arrays.toString(groupids) + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")");
return rs;
}
@@ -283,7 +299,7 @@ public abstract class WebSocket<T> {
*
* @return groupid
*/
public final Serializable getGroupid() {
public final G getGroupid() {
return _groupid;
}
@@ -327,11 +343,11 @@ public abstract class WebSocket<T> {
/**
* 获取指定groupid的WebSocketGroup, 没有返回null
*
* @param groupid groupid
* @param groupid Serializable
*
* @return WebSocketGroup
*/
protected final WebSocketGroup getWebSocketGroup(Serializable groupid) {
protected final WebSocketGroup getWebSocketGroup(G groupid) {
return _engine.getWebSocketGroup(groupid);
}
@@ -361,7 +377,7 @@ public abstract class WebSocket<T> {
*
* @return groupid
*/
protected abstract CompletableFuture<Serializable> createGroupid();
protected abstract CompletableFuture<G> createGroupid();
/**
* 标记为WebSocketBinary才需要重写此方法

View File

@@ -11,6 +11,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import org.redkale.util.*;
/**
@@ -97,6 +98,16 @@ public final class WebSocketEngine {
}
}
CompletableFuture<Integer> sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) {
CompletableFuture<Integer> future = null;
for (Serializable groupid : groupids) {
WebSocketGroup group = getWebSocketGroup(groupid);
if (group == null) continue;
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}
Collection<WebSocketGroup> getWebSocketGroups() {
return containers.values();
}

View File

@@ -61,7 +61,7 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last, Serializable groupid);
protected abstract CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress addr);
@@ -73,7 +73,7 @@ public abstract class WebSocketNode {
return connect(groupid, localSncpAddress);
}
final CompletableFuture<Void> disconnect(Serializable groupid) {
final CompletableFuture<Void> disconnect(final Serializable groupid) {
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this.localEngine.getEngineid() + ").");
return disconnect(groupid, localSncpAddress);
}
@@ -138,39 +138,52 @@ public abstract class WebSocketNode {
}
//--------------------------------------------------------------------------------
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message) {
return sendMessage(groupid, false, message, true);
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, final Serializable... groupids) {
return sendMessage(false, message, true, groupids);
}
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, false, message, last);
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, boolean last, final Serializable... groupids) {
return sendMessage(false, message, last, groupids);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message) {
return sendMessage(groupid, true, message, true);
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, final Serializable... groupids) {
return sendMessage(true, message, true, groupids);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, true, message, last);
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, boolean last, final Serializable... groupids) {
return sendMessage(true, message, last, groupids);
}
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message) {
return sendMessage(groupid, recent, message, true);
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message, final Serializable... groupids) {
return sendMessage(recent, message, true, groupids);
}
/**
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
* 如果当前WebSocketNode是远程模式此方法只发送远程连接
*
* @param groupid String
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
* @param last 是否最后一条
* @param groupids Serializable[]
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
//最近连接发送逻辑还没有理清楚
public final CompletableFuture<Integer> sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) {
public final CompletableFuture<Integer> sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) {
if (groupids == null || groupids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.sendMessage(recent, message, last, groupids);
}
CompletableFuture<Integer> future = null;
for (Serializable groupid : groupids) {
future = future == null ? sendOneMessage(recent, message, last, groupid)
: future.thenCombine(sendOneMessage(recent, message, last, groupid), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}
private CompletableFuture<Integer> sendOneMessage(final boolean recent, final Object message, final boolean last, final Serializable groupid) {
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine");
CompletableFuture<Integer> localFuture = null;
final WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid);
@@ -192,12 +205,11 @@ public abstract class WebSocketNode {
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.sendMessage(addr, groupid, recent, message, last)
: future.thenCombine(remoteNode.sendMessage(addr, groupid, recent, message, last), (a, b) -> a | b);
future = future == null ? remoteNode.sendMessage(addr, recent, message, last, groupid)
: future.thenCombine(remoteNode.sendMessage(addr, recent, message, last, groupid), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(0) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
}

View File

@@ -191,7 +191,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return null;
}
protected abstract <T> WebSocket<T> createWebSocket();
protected abstract <G extends Serializable, T> WebSocket<G, T> createWebSocket();
private static MessageDigest getMessageDigest() {
try {

View File

@@ -47,7 +47,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, boolean recent, Object message, boolean last, Serializable groupid) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {