This commit is contained in:
Redkale
2017-05-28 10:19:56 +08:00
parent 96e4b8834d
commit 6cd232efd2
14 changed files with 264 additions and 485 deletions

View File

@@ -257,6 +257,9 @@ public final class Rest {
mv.visitIntInsn(BIPUSH, rws.liveinterval()); mv.visitIntInsn(BIPUSH, rws.liveinterval());
} }
mv.visitFieldInsn(PUTFIELD, newDynName, "liveinterval", "I"); mv.visitFieldInsn(PUTFIELD, newDynName, "liveinterval", "I");
mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z");
mv.visitInsn(RETURN); mv.visitInsn(RETURN);
mv.visitMaxs(2, 1); mv.visitMaxs(2, 1);
mv.visitEnd(); mv.visitEnd();

View File

@@ -37,6 +37,13 @@ public @interface RestWebSocket {
*/ */
String catalog() default ""; String catalog() default "";
/**
* 是否单用户单连接, 默认单用户单连接
*
* @return 是否单用户单连接
*/
boolean single() default true;
/** /**
* WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值60秒 * WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值60秒
* *

View File

@@ -10,6 +10,7 @@ import java.io.*;
import java.net.*; import java.net.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.stream.Stream;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.Comment; import org.redkale.util.Comment;
@@ -20,7 +21,7 @@ import org.redkale.util.Comment;
* WebSocket 有两种模式: * WebSocket 有两种模式:
* 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下: * 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下:
* 1.1 onOpen 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断登录态。 * 1.1 onOpen 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断登录态。
* 1.2 createGroupid 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断用户权限是否符合。 * 1.2 createUserid 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断用户权限是否符合。
* 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。 * 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。
* 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。 * 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。
* 1.5 onClose WebSocket被关闭后回调此方法。 * 1.5 onClose WebSocket被关闭后回调此方法。
@@ -28,7 +29,7 @@ import org.redkale.util.Comment;
* *
* 2) 原始二进制模式: 此模式有别于HTML5规范可以视为原始的TCP连接。通常用于音频视频通讯场景。其流程顺序如下: * 2) 原始二进制模式: 此模式有别于HTML5规范可以视为原始的TCP连接。通常用于音频视频通讯场景。其流程顺序如下:
* 2.1 onOpen 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断登录态。 * 2.1 onOpen 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断登录态。
* 2.2 createGroupid 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断用户权限是否符合。 * 2.2 createWebSocketid 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断用户权限是否符合。
* 2.3 onRead WebSocket成功连接后回调此方法 由此方法处理原始的TCP连接 需要业务代码去控制WebSocket的关闭。 * 2.3 onRead WebSocket成功连接后回调此方法 由此方法处理原始的TCP连接 需要业务代码去控制WebSocket的关闭。
* 二进制模式下 以上方法都应该被重载。 * 二进制模式下 以上方法都应该被重载。
* </pre></blockquote> * </pre></blockquote>
@@ -69,11 +70,9 @@ public abstract class WebSocket<G extends Serializable, T> {
WebSocketEngine _engine; //不可能为空 WebSocketEngine _engine; //不可能为空
WebSocketGroup _group; //不可能为空
String _sessionid; //不可能为空 String _sessionid; //不可能为空
G _groupid; //不可能为空 G _userid; //不可能为空
SocketAddress _remoteAddress;//不可能为空 SocketAddress _remoteAddress;//不可能为空
@@ -87,8 +86,6 @@ public abstract class WebSocket<G extends Serializable, T> {
private Map<String, Object> attributes = new HashMap<>(); //非线程安全 private Map<String, Object> attributes = new HashMap<>(); //非线程安全
protected long websocketid = Math.abs(System.nanoTime()); //唯一ID
protected WebSocket() { protected WebSocket() {
} }
@@ -187,131 +184,56 @@ public abstract class WebSocket<G extends Serializable, T> {
*/ */
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) { CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
CompletableFuture<Integer> rs = this._runner.sendMessage(packet); CompletableFuture<Integer> rs = this._runner.sendMessage(packet);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + getGroupid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")"); if (_engine.finest) _engine.logger.finest("userid:" + userid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")");
return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs; return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs;
} }
//---------------------------------------------------------------- //----------------------------------------------------------------
/** /**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
* *
* @param message 不可为空 * @param message 不可为空
* @param groupids Serializable[] * @param last 是否最后一条
* @param userids Serializable[]
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final CompletableFuture<Integer> sendEachMessage(Object message, G... groupids) { public final CompletableFuture<Integer> sendMessage(Object message, boolean last, G... userids) {
return sendEachMessage(message, true, groupids);
}
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param message 不可为空
* @param last 是否最后一条
* @param groupids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendEachMessage(Object message, boolean last, G... groupids) {
return sendMessage(false, message, last, groupids);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param message 不可为空
* @param groupids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendRecentMessage(Object message, G... groupids) {
return sendMessage(true, message, true, groupids);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupids Serializable[]
* @param message 不可为空
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendRecentMessage(Object message, boolean last, G... groupids) {
return sendMessage(true, message, last, groupids);
}
/**
* 给指定groupid的WebSocketGroup下WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param recent 是否只发最近接入的WebSocket
* @param message 不可为空
* @param last 是否最后一条
* @param groupids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendMessage(boolean recent, Object message, boolean last, G... groupids) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(recent, json, last, groupids)); return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(json, last, userids));
} }
CompletableFuture<Integer> rs = _engine.node.sendMessage(recent, message, last, groupids); CompletableFuture<Integer> rs = _engine.node.sendMessage(message, last, userids);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + Arrays.toString(groupids) + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")");
return rs; return rs;
} }
/** /**
* 广播消息, 给所有人的所有接入的WebSocket节点发消息 * 广播消息, 给所有人发消息
* *
* @param message 消息内容 * @param message 消息内容
* *
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> broadcastEachMessage(final Object message) { public final CompletableFuture<Integer> broadcastMessage(final Object message) {
return broadcastMessage(false, message, true); return broadcastMessage(message, true);
}
/**
* 广播消息, 给所有人最近接入的WebSocket节点发消息
*
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastRecentMessage(final Object message) {
return broadcastMessage(true, message, true);
} }
/** /**
* 广播消息, 给所有人发消息 * 广播消息, 给所有人发消息
* *
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message) {
return broadcastMessage(recent, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容 * @param message 消息内容
* @param last 是否最后一条 * @param last 是否最后一条
* *
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message, final boolean last) { public final CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(recent, json, last)); return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(json, last));
} }
CompletableFuture<Integer> rs = _engine.node.broadcastMessage(recent, message, last); CompletableFuture<Integer> rs = _engine.node.broadcastMessage(message, last);
if (_engine.finest) _engine.logger.finest("broadcast " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); if (_engine.finest) _engine.logger.finest("broadcast send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")");
return rs; return rs;
} }
@@ -319,13 +241,13 @@ public abstract class WebSocket<G extends Serializable, T> {
* 获取用户在线的SNCP节点地址列表不是分布式则返回元素数量为1且元素值为null的列表<br> * 获取用户在线的SNCP节点地址列表不是分布式则返回元素数量为1且元素值为null的列表<br>
* InetSocketAddress 为 SNCP节点地址 * InetSocketAddress 为 SNCP节点地址
* *
* @param groupid Serializable * @param userid Serializable
* *
* @return 地址列表 * @return 地址列表
*/ */
public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable groupid) { public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
if (_engine.node == null) return CompletableFuture.completedFuture(null); if (_engine.node == null) return CompletableFuture.completedFuture(null);
return _engine.node.getRpcNodeAddresses(groupid); return _engine.node.getRpcNodeAddresses(userid);
} }
/** /**
@@ -333,13 +255,13 @@ public abstract class WebSocket<G extends Serializable, T> {
* Map.key 为 SNCP节点地址, 含值为null的key表示没有分布式 * Map.key 为 SNCP节点地址, 含值为null的key表示没有分布式
* Map.value 为 用户客户端的IP * Map.value 为 用户客户端的IP
* *
* @param groupid Serializable * @param userid Serializable
* *
* @return 地址集合 * @return 地址集合
*/ */
public CompletableFuture<Map<InetSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable groupid) { public CompletableFuture<Map<InetSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
if (_engine.node == null) return CompletableFuture.completedFuture(null); if (_engine.node == null) return CompletableFuture.completedFuture(null);
return _engine.node.getRpcNodeWebSocketAddresses(groupid); return _engine.node.getRpcNodeWebSocketAddresses(userid);
} }
/** /**
@@ -379,12 +301,12 @@ public abstract class WebSocket<G extends Serializable, T> {
} }
/** /**
* 获取当前WebSocket所属的groupid * 获取当前WebSocket所属的userid
* *
* @return groupid * @return userid
*/ */
public final G getGroupid() { public final G userid() {
return _groupid; return _userid;
} }
/** /**
@@ -416,37 +338,41 @@ public abstract class WebSocket<G extends Serializable, T> {
//------------------------------------------------------------------- //-------------------------------------------------------------------
/** /**
* 获取当前WebSocket所属的WebSocketGroup 不会为null * 获取指定userid的WebSocket数组, 没有返回null <br>
* 此方法用于单用户多连接模式
* *
* @return WebSocketGroup * @param userid Serializable
*
* @return WebSocket集合
*/ */
protected final WebSocketGroup getWebSocketGroup() { protected final Stream<WebSocket> getWebSockets(G userid) {
return _group; return _engine.getWebSockets(userid);
} }
/** /**
* 获取指定groupid的WebSocketGroup, 没有返回null * 获取指定userid的WebSocket数组, 没有返回null<br>
* 此方法用于单用户单连接模式
* *
* @param groupid Serializable * @param userid Serializable
* *
* @return WebSocketGroup * @return WebSocket
*/ */
protected final WebSocketGroup getWebSocketGroup(G groupid) { protected final WebSocket findWebSocket(G userid) {
return _engine.getWebSocketGroup(groupid); return _engine.findWebSocket(userid);
} }
/** /**
* 获取当前进程节点所有在线的WebSocketGroup * 获取当前进程节点所有在线的WebSocket
* *
* @return WebSocketGroup列表 * @return WebSocketGroup列表
*/ */
protected final Collection<WebSocketGroup> getWebSocketGroups() { protected final Collection<WebSocket> getWebSockets() {
return _engine.getWebSocketGroups(); return _engine.getWebSockets();
} }
//------------------------------------------------------------------- //-------------------------------------------------------------------
/** /**
* 返回sessionid, null表示连接不合法或异常,默认实现是request.getSessionid(true),通常需要重写该方法 * 返回sessionid, null表示连接不合法或异常,默认实现是request.sessionid(true),通常需要重写该方法
* *
* @param request HttpRequest * @param request HttpRequest
* *
@@ -457,11 +383,11 @@ public abstract class WebSocket<G extends Serializable, T> {
} }
/** /**
* 创建groupid null表示异常 必须实现该方法 通常为用户ID为groupid * 创建userid null表示异常 必须实现该方法
* *
* @return groupid * @return userid
*/ */
protected abstract CompletableFuture<G> createGroupid(); protected abstract CompletableFuture<G> createUserid();
/** /**
* 标记为WebSocketBinary才需要重写此方法 * 标记为WebSocketBinary才需要重写此方法
@@ -538,6 +464,6 @@ public abstract class WebSocket<G extends Serializable, T> {
@Override @Override
public String toString() { public String toString() {
return this.websocketid + "@" + _remoteAddr; return this.userid() + "@" + _remoteAddr;
} }
} }

View File

@@ -11,6 +11,7 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import org.redkale.util.*; import org.redkale.util.*;
@@ -42,8 +43,13 @@ public final class WebSocketEngine {
//JsonConvert //JsonConvert
protected final JsonConvert convert; protected final JsonConvert convert;
//在线用户ID对应的WebSocket组当WebSocketGroup内没有WebSocket会从containers删掉 protected final boolean single; //是否单用户单连接
private final Map<Serializable, WebSocketGroup> containers = new ConcurrentHashMap<>();
//在线用户ID对应的WebSocket组用于单用户单连接模式
private final Map<Serializable, WebSocket> websockets = new ConcurrentHashMap<>();
//在线用户ID对应的WebSocket组用于单用户多连接模式
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
//用于PING的定时器 //用于PING的定时器
private ScheduledThreadPoolExecutor scheduler; private ScheduledThreadPoolExecutor scheduler;
@@ -56,8 +62,9 @@ public final class WebSocketEngine {
private int liveinterval; private int liveinterval;
protected WebSocketEngine(String engineid, HttpContext context, int liveinterval, WebSocketNode node, Logger logger) { protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Logger logger) {
this.engineid = engineid; this.engineid = engineid;
this.single = single;
this.context = context; this.context = context;
this.convert = context.getJsonConvert(); this.convert = context.getJsonConvert();
this.node = node; this.node = node;
@@ -78,7 +85,7 @@ public final class WebSocketEngine {
}); });
long delay = (interval - System.currentTimeMillis() / 1000 % interval) + index * 5; long delay = (interval - System.currentTimeMillis() / 1000 % interval) + index * 5;
scheduler.scheduleWithFixedDelay(() -> { scheduler.scheduleWithFixedDelay(() -> {
getWebSocketGroups().stream().forEach(x -> x.sendEachPing()); getWebSockets().forEach(x -> x.sendPing());
}, delay, interval, TimeUnit.SECONDS); }, delay, interval, TimeUnit.SECONDS);
if (finest) logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(delay:" + delay + ", interval:" + interval + "s) scheduler executor"); if (finest) logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(delay:" + delay + ", interval:" + interval + "s) scheduler executor");
} }
@@ -87,96 +94,154 @@ public final class WebSocketEngine {
if (scheduler != null) scheduler.shutdownNow(); if (scheduler != null) scheduler.shutdownNow();
} }
void add(WebSocket socket) { //非线程安全, 在常规场景中无需锁 void add(WebSocket socket) {
WebSocketGroup group = containers.get(socket._groupid); if (single) {
if (group == null) { websockets.put(socket._userid, socket);
group = new WebSocketGroup(context, socket._groupid); } else { //非线程安全, 在常规场景中无需锁
containers.putIfAbsent(socket._groupid, group); List<WebSocket> list = websockets2.get(socket._userid);
if (node != null) node.connect(socket._groupid); if (list == null) {
list = new CopyOnWriteArrayList<>();
websockets2.put(socket._userid, list);
}
list.add(socket);
} }
group.add(socket); if (node != null) node.connect(socket._userid);
} }
void remove(WebSocket socket) { //非线程安全, 在常规场景中无需锁 void remove(WebSocket socket) {
final WebSocketGroup group = containers.get(socket._groupid); Serializable userid = socket._userid;
if (group == null) { if (single) {
if (node != null) node.disconnect(socket._groupid); websockets.remove(userid);
return; if (node != null) node.disconnect(userid);
} } else { //非线程安全, 在常规场景中无需锁
group.remove(socket); List<WebSocket> list = websockets2.get(userid);
if (group.isEmpty()) { if (list != null) {
containers.remove(socket._groupid); list.remove(socket);
if (node != null) node.disconnect(socket._groupid); if (list.isEmpty()) {
websockets2.remove(userid);
if (node != null) node.disconnect(userid);
}
}
} }
} }
public CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message, final boolean last) { public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(recent, json, last)); return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(json, last));
} }
final Collection<WebSocketGroup> groups = getWebSocketGroups(); final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && groups.size() > 1;
if (more) { if (more) {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier())); packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (WebSocketGroup group : groups) { if (single) {
if (group == null) continue; for (WebSocket websocket : websockets.values()) {
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
} else {
for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) {
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
}
} }
if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else { } else {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (WebSocketGroup group : groups) { if (single) {
if (group == null) continue; for (WebSocket websocket : websockets.values()) {
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
} else {
for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) {
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
}
} }
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} }
} }
CompletableFuture<Integer> sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) { public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(recent, json, last, groupids)); return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids));
} }
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && groupids.length > 1; final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1;
if (more) { if (more) {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier())); packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (Serializable groupid : groupids) { if (single) {
WebSocketGroup group = getWebSocketGroup(groupid); for (Serializable userid : userids) {
if (group == null) continue; WebSocket websocket = websockets.get(userid);
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); if (websocket == null) continue;
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
} else {
for (Serializable userid : userids) {
List<WebSocket> list = websockets2.get(userid);
if (list == null) continue;
for (WebSocket websocket : list) {
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
}
} }
if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else { } else {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (Serializable groupid : groupids) { if (single) {
WebSocketGroup group = getWebSocketGroup(groupid); for (Serializable userid : userids) {
if (group == null) continue; WebSocket websocket = websockets.get(userid);
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); if (websocket == null) continue;
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
} else {
for (Serializable userid : userids) {
List<WebSocket> list = websockets2.get(userid);
if (list == null) continue;
for (WebSocket websocket : list) {
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
}
} }
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} }
} }
Collection<WebSocketGroup> getWebSocketGroups() { Collection<WebSocket> getWebSockets() {
return containers.values(); if (single) return websockets.values();
List<WebSocket> list = new ArrayList<>();
websockets2.values().forEach(x -> list.addAll(x));
return list;
} }
public WebSocketGroup getWebSocketGroup(Serializable groupid) { //适用于单用户单连接模式
return containers.get(groupid); public WebSocket findWebSocket(Serializable userid) {
if (single) return websockets.get(userid);
List<WebSocket> list = websockets2.get(userid);
return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1);
} }
public boolean existsWebSocketGroup(Serializable groupid) { //适用于单用户多连接模式
return containers.containsKey(groupid); public Stream<WebSocket> getWebSockets(Serializable userid) {
if (single) {
WebSocket websocket = websockets.get(userid);
return websocket == null ? Stream.empty() : Stream.of(websocket);
} else {
List<WebSocket> list = websockets2.get(userid);
return list == null ? Stream.empty() : list.stream();
}
}
public boolean existsWebSocket(Serializable userid) {
return single ? websockets.containsKey(userid) : websockets2.containsKey(userid);
} }
public String getEngineid() { public String getEngineid() {

View File

@@ -1,150 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.http;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Stream;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public final class WebSocketGroup {
private final Serializable groupid;
private final HttpContext context;
private WebSocket recentWebSocket;
private final List<WebSocket> list = new CopyOnWriteArrayList<>();
private final Map<String, Object> attributes = new HashMap<>();
WebSocketGroup(HttpContext context, Serializable groupid) {
this.context = context;
this.groupid = groupid;
}
public Serializable getGroupid() {
return groupid;
}
public Stream<WebSocket> getWebSockets() {
return list.stream();
}
void remove(WebSocket socket) {
list.remove(socket);
}
void add(WebSocket socket) {
socket._group = this;
this.recentWebSocket = socket;
list.add(socket);
}
void setRecentWebSocket(WebSocket socket) {
this.recentWebSocket = socket;
}
public final boolean isEmpty() {
return list.isEmpty();
}
public final int size() {
return list.size();
}
/**
* 最近发送消息的WebSocket
*
* @return WebSocket
*/
public final WebSocket getRecentWebSocket() {
return recentWebSocket;
}
@SuppressWarnings("unchecked")
public final <T> T getAttribute(String name) {
return (T) attributes.get(name);
}
public final void removeAttribute(String name) {
attributes.remove(name);
}
public final void setAttribute(String name, Object value) {
attributes.put(name, value);
}
public final CompletableFuture<Integer> send(boolean recent, Object message, boolean last) {
if (recent) {
return recentWebSocket.send(message, last);
} else {
return sendEach(message, last);
}
}
final CompletableFuture<Integer> send(boolean recent, final WebSocketPacket packet) {
if (recent) {
return recentWebSocket.send(packet);
} else {
return sendEach(packet);
}
}
public final CompletableFuture<Integer> sendEach(Object message) {
return sendEach(message, true);
}
public final CompletableFuture<Integer> sendEach(final WebSocketPacket packet) {
CompletableFuture<Integer> future = null;
final boolean more = packet.sendBuffers == null && list.size() > 1;
if (more) packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
for (WebSocket s : list) {
future = future == null ? s.sendPacket(packet) : future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b);
}
if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
return future == null ? CompletableFuture.completedFuture(0) : future;
}
public final CompletableFuture<Integer> sendEachPing() {
return sendEach(WebSocketPacket.DEFAULT_PING_PACKET);
}
public final CompletableFuture<Integer> sendRecent(Object message) {
return sendRecent(message, true);
}
public final CompletableFuture<Integer> sendRecent(WebSocketPacket packet) {
return recentWebSocket.send(packet);
}
public final CompletableFuture<Integer> sendEach(Object message, boolean last) {
if (message instanceof WebSocketPacket) {
return sendEach((WebSocketPacket) message);
} else if (message != null && !(message instanceof byte[]) && !(message instanceof CharSequence)) {
message = recentWebSocket._jsonConvert.convertTo(message);
}
return sendEach(new WebSocketPacket((Serializable) message, last));
}
public final CompletableFuture<Integer> sendRecent(Object message, boolean last) {
return recentWebSocket.send(message, last);
}
@Override
public String toString() {
return "{groupid: " + groupid + ", list.size: " + (list == null ? -1 : list.size()) + "}";
}
}

View File

@@ -56,29 +56,29 @@ public abstract class WebSocketNode {
public final void postDestroy(AnyValue conf) { public final void postDestroy(AnyValue conf) {
if (this.localEngine == null) return; if (this.localEngine == null) return;
//关掉所有本地本地WebSocket //关掉所有本地本地WebSocket
this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); this.localEngine.getWebSockets().forEach(g -> disconnect(g.userid()));
if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress); if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress);
} }
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last, Serializable groupid); protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid);
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last); protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last);
protected abstract CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress addr); protected abstract CompletableFuture<Void> connect(Serializable userid, InetSocketAddress addr);
protected abstract CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress addr); protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress addr);
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
final CompletableFuture<Void> connect(final Serializable groupid) { final CompletableFuture<Void> connect(final Serializable userid) {
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + this.localEngine.getEngineid() + ")."); if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + userid + " on " + this.localEngine.getEngineid() + ").");
return connect(groupid, localSncpAddress); return connect(userid, localSncpAddress);
} }
final CompletableFuture<Void> disconnect(final Serializable groupid) { final CompletableFuture<Void> disconnect(final Serializable userid) {
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this.localEngine.getEngineid() + ")."); if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + userid + " on " + this.localEngine.getEngineid() + ").");
return disconnect(groupid, localSncpAddress); return disconnect(userid, localSncpAddress);
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@@ -87,14 +87,14 @@ public abstract class WebSocketNode {
* 该方法仅供内部调用 * 该方法仅供内部调用
* *
* @param targetAddress InetSocketAddress * @param targetAddress InetSocketAddress
* @param groupid Serializable * @param userid Serializable
* *
* @return 客户端地址列表 * @return 客户端地址列表
*/ */
protected CompletableFuture<List<String>> remoteWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { protected CompletableFuture<List<String>> remoteWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) {
if (remoteNode == null) return CompletableFuture.completedFuture(null); if (remoteNode == null) return CompletableFuture.completedFuture(null);
try { try {
return remoteNode.getWebSocketAddresses(targetAddress, groupid); return remoteNode.getWebSocketAddresses(targetAddress, userid);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e); logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e);
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
@@ -105,12 +105,12 @@ public abstract class WebSocketNode {
* 获取用户在线的SNCP节点地址列表不是分布式则返回元素数量为1且元素值为null的列表<br> * 获取用户在线的SNCP节点地址列表不是分布式则返回元素数量为1且元素值为null的列表<br>
* InetSocketAddress 为 SNCP节点地址 * InetSocketAddress 为 SNCP节点地址
* *
* @param groupid Serializable * @param userid Serializable
* *
* @return 地址列表 * @return 地址列表
*/ */
public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable groupid) { public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(groupid); if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(userid);
List<InetSocketAddress> rs = new ArrayList<>(); List<InetSocketAddress> rs = new ArrayList<>();
rs.add(this.localSncpAddress); rs.add(this.localSncpAddress);
return CompletableFuture.completedFuture(rs); return CompletableFuture.completedFuture(rs);
@@ -121,18 +121,18 @@ public abstract class WebSocketNode {
* Map.key 为 SNCP节点地址, 含值为null的key表示没有分布式 * Map.key 为 SNCP节点地址, 含值为null的key表示没有分布式
* Map.value 为 用户客户端的IP * Map.value 为 用户客户端的IP
* *
* @param groupid Serializable * @param userid Serializable
* *
* @return 地址集合 * @return 地址集合
*/ */
public CompletableFuture<Map<InetSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable groupid) { public CompletableFuture<Map<InetSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
CompletableFuture<Collection<InetSocketAddress>> sncpFuture = getRpcNodeAddresses(groupid); CompletableFuture<Collection<InetSocketAddress>> sncpFuture = getRpcNodeAddresses(userid);
return sncpFuture.thenCompose((Collection<InetSocketAddress> addrs) -> { return sncpFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); if (finest) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>()); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>());
CompletableFuture<Map<InetSocketAddress, List<String>>> future = null; CompletableFuture<Map<InetSocketAddress, List<String>>> future = null;
for (final InetSocketAddress nodeAddress : addrs) { for (final InetSocketAddress nodeAddress : addrs) {
CompletableFuture<Map<InetSocketAddress, List<String>>> mapFuture = getWebSocketAddresses(nodeAddress, groupid) CompletableFuture<Map<InetSocketAddress, List<String>>> mapFuture = getWebSocketAddresses(nodeAddress, userid)
.thenCompose((List<String> list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list))); .thenCompose((List<String> list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list)));
future = future == null ? mapFuture : future.thenCombine(mapFuture, (a, b) -> Utility.merge(a, b)); future = future == null ? mapFuture : future.thenCombine(mapFuture, (a, b) -> Utility.merge(a, b));
} }
@@ -141,99 +141,58 @@ public abstract class WebSocketNode {
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, final Serializable... groupids) { public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... userids) {
return sendMessage(false, message, true, groupids); return sendMessage(message, true, userids);
}
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, boolean last, final Serializable... groupids) {
return sendMessage(false, message, last, groupids);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, final Serializable... groupids) {
return sendMessage(true, message, true, groupids);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, boolean last, final Serializable... groupids) {
return sendMessage(true, message, last, groupids);
}
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message, final Serializable... groupids) {
return sendMessage(recent, message, true, groupids);
} }
/** /**
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br> * 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
* 如果当前WebSocketNode是远程模式此方法只发送远程连接 * 如果当前WebSocketNode是远程模式此方法只发送远程连接
* *
* @param groupids Serializable[] * @param message 消息内容
* @param recent 是否只发送给最近接入的WebSocket节点 * @param last 是否最后一条
* @param message 消息内容 * @param userids Serializable[]
* @param last 是否最后一条
* *
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
//最近连接发送逻辑还没有理清楚 //最近连接发送逻辑还没有理清楚
public final CompletableFuture<Integer> sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) { public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
if (groupids == null || groupids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.sendMessage(recent, message, last, groupids); return this.localEngine.sendMessage(message, last, userids);
} }
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (Serializable groupid : groupids) { for (Serializable userid : userids) {
future = future == null ? sendOneMessage(recent, message, last, groupid) future = future == null ? sendOneMessage(message, last, userid)
: future.thenCombine(sendOneMessage(recent, message, last, groupid), (a, b) -> a | b); : future.thenCombine(sendOneMessage(message, last, userid), (a, b) -> a | b);
} }
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} }
/** /**
* 广播消息, 给所有人的所有接入的WebSocket节点发消息 * 广播消息, 给所有人发消息
* *
* @param message 消息内容 * @param message 消息内容
* *
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> broadcastEachMessage(final Object message) { public final CompletableFuture<Integer> broadcastMessage(final Object message) {
return broadcastMessage(false, message, true); return broadcastMessage(message, true);
}
/**
* 广播消息, 给所有人最近接入的WebSocket节点发消息
*
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastRecentMessage(final Object message) {
return broadcastMessage(true, message, true);
} }
/** /**
* 广播消息, 给所有人发消息 * 广播消息, 给所有人发消息
* *
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message) {
return broadcastMessage(recent, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容 * @param message 消息内容
* @param last 是否最后一条 * @param last 是否最后一条
* *
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> broadcastMessage(final boolean recent, final Object message, final boolean last) { public final CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.broadcastMessage(recent, message, last); return this.localEngine.broadcastMessage(message, last);
} }
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(recent, message, last); CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(message, last);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync("redkale_sncpnodes"); CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync("redkale_sncpnodes");
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> { CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (finest) logger.finest("websocket broadcast message on " + addrs); if (finest) logger.finest("websocket broadcast message on " + addrs);
@@ -241,38 +200,33 @@ public abstract class WebSocketNode {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) { for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue; if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.broadcastMessage(addr, recent, message, last) future = future == null ? remoteNode.broadcastMessage(addr, message, last)
: future.thenCombine(remoteNode.broadcastMessage(addr, recent, message, last), (a, b) -> a | b); : future.thenCombine(remoteNode.broadcastMessage(addr, message, last), (a, b) -> a | b);
} }
return future == null ? CompletableFuture.completedFuture(0) : future; return future == null ? CompletableFuture.completedFuture(0) : future;
}); });
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
} }
private CompletableFuture<Integer> sendOneMessage(final boolean recent, final Object message, final boolean last, final Serializable groupid) { private CompletableFuture<Integer> sendOneMessage(final Object message, final boolean last, final Serializable userid) {
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); if (finest) logger.finest("websocket want send message {userid:" + userid + ", content:'" + message + "'} from locale node to locale engine");
CompletableFuture<Integer> localFuture = null; CompletableFuture<Integer> localFuture = null;
final WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid); if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid);
if (group != null) localFuture = group.send(recent, message, last);
if (recent && localFuture != null) { //已经给最近连接发送的消息
if (finest) logger.finest("websocket want send recent message success");
return localFuture;
}
if (this.sncpNodeAddresses == null || this.remoteNode == null) { if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (finest) logger.finest("websocket remote node is null"); if (finest) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点 //没有CacheSource就不会有分布式节点
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
} }
//远程节点发送消息 //远程节点发送消息
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(groupid); CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(userid);
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> { CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); if (finest) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) { for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue; if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.sendMessage(addr, recent, message, last, groupid) future = future == null ? remoteNode.sendMessage(addr, message, last, userid)
: future.thenCombine(remoteNode.sendMessage(addr, recent, message, last, groupid), (a, b) -> a | b); : future.thenCombine(remoteNode.sendMessage(addr, message, last, userid), (a, b) -> a | b);
} }
return future == null ? CompletableFuture.completedFuture(0) : future; return future == null ? CompletableFuture.completedFuture(0) : future;
}); });

View File

@@ -116,7 +116,6 @@ class WebSocketRunner implements Runnable {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
return; return;
} }
webSocket._group.setRecentWebSocket(webSocket);
if (packet.type == FrameType.TEXT) { if (packet.type == FrameType.TEXT) {
Object message = convert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); Object message = convert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);

View File

@@ -31,10 +31,7 @@ import org.redkale.util.*;
* / \ * / \
* / \ * / \
* / \ * / \
* WebSocketGroup1 WebSocketGroup2 * WebSocket1 WebSocket2
* / \ / \
* / \ / \
* WebSocket1 WebSocket2 WebSocket3 WebSocket4
* *
* </pre></blockquote> * </pre></blockquote>
* *
@@ -62,6 +59,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected Type messageTextType; //RestWebSocket时会被修改 protected Type messageTextType; //RestWebSocket时会被修改
protected boolean single = true; //是否单用户单连接
protected int liveinterval = DEFAILT_LIVEINTERVAL; protected int liveinterval = DEFAILT_LIVEINTERVAL;
@Resource @Resource
@@ -98,7 +97,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
} }
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service //存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", context, liveinterval, this.node, logger); this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, logger);
this.node.init(conf); this.node.init(conf);
this.node.localEngine.init(conf); this.node.localEngine.init(conf);
} }
@@ -163,19 +162,19 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
HttpContext context = response.getContext(); HttpContext context = response.getContext();
CompletableFuture<Serializable> groupFuture = webSocket.createGroupid(); CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (groupFuture == null) { if (userFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create groupid abort. request = " + request); if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request);
response.finish(true); response.finish(true);
return; return;
} }
groupFuture.whenComplete((groupid, ex2) -> { userFuture.whenComplete((userid, ex2) -> {
if (groupid == null || ex2 != null) { if (userid == null || ex2 != null) {
if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create groupid abort. request = " + request, ex2); if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
response.finish(true); response.finish(true);
return; return;
} }
webSocket._groupid = groupid; webSocket._userid = userid;
WebSocketServlet.this.node.localEngine.add(webSocket); WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel(), wsbinary); WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel(), wsbinary);
webSocket._runner = runner; webSocket._runner = runner;

View File

@@ -40,57 +40,51 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
final List<String> rs = new ArrayList<>(); final List<String> rs = new ArrayList<>();
final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); this.localEngine.getWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
if (group != null) group.getWebSockets().forEach(x -> rs.add(x.getRemoteAddr()));
return rs; return rs;
}); });
} }
@Override @Override
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, boolean recent, Object message, boolean last, Serializable groupid) { public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, Object message, boolean last, Serializable userid) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); return this.localEngine.sendMessage(message, last, userid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("receive websocket message {engineid:'" + this.localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY);
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
return group.send(recent, message, last);
} }
@Override @Override
public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress addr, boolean recent, Object message, boolean last) { public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress addr, Object message, boolean last) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastMessage(recent, message, last); return this.localEngine.broadcastMessage(message, last);
} }
/** /**
* 当用户连接到节点需要更新到CacheSource * 当用户连接到节点需要更新到CacheSource
* *
* @param groupid String * @param userid String
* @param sncpAddr InetSocketAddress * @param sncpAddr InetSocketAddress
* *
* @return 无返回值 * @return 无返回值
*/ */
@Override @Override
public CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress sncpAddr) { public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(groupid, sncpAddr); CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr)); future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr));
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + sncpAddr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
return future; return future;
} }
/** /**
* 当用户从一个节点断掉了所有的连接需要从CacheSource中删除 * 当用户从一个节点断掉了所有的连接需要从CacheSource中删除
* *
* @param groupid String * @param userid String
* @param sncpAddr InetSocketAddress * @param sncpAddr InetSocketAddress
* *
* @return 无返回值 * @return 无返回值
*/ */
@Override @Override
public CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress sncpAddr) { public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(groupid, sncpAddr); CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(userid, sncpAddr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + sncpAddr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
return future; return future;
} }
} }

View File

@@ -96,18 +96,6 @@ public interface WebSocketDesc {
public void close(); public void close();
//获取当前WebSocket所属的WebSocketGroup 不会为null
/* protected */ WebSocketGroup getWebSocketGroup();
//获取指定groupid的WebSocketGroup, 没有返回null
/* protected */ WebSocketGroup getWebSocketGroup(Serializable groupid);
//获取当前进程节点所有在线的WebSocketGroup
/* protected */ Collection<WebSocketGroup> getWebSocketGroups();
//获取在线用户的节点地址列表 //获取在线用户的节点地址列表
/* protected */ Collection<InetSocketAddress> getOnlineNodes(Serializable groupid); /* protected */ Collection<InetSocketAddress> getOnlineNodes(Serializable groupid);

View File

@@ -59,11 +59,11 @@ public class ChatWebSocketServlet extends WebSocketServlet {
icounter.incrementAndGet(); icounter.incrementAndGet();
counter.incrementAndGet(); counter.incrementAndGet();
if (debug) System.out.println("收到消息: " + message); if (debug) System.out.println("收到消息: " + message);
super.getWebSocketGroup().getWebSockets().forEach(x -> x.send(message)); super.getWebSockets().forEach(x -> x.send(message));
} }
@Override @Override
protected CompletableFuture<String> createGroupid() { protected CompletableFuture<String> createUserid() {
return CompletableFuture.completedFuture("2"); return CompletableFuture.completedFuture("2");
} }

View File

@@ -80,31 +80,25 @@ public class VideoWebSocketServlet extends WebSocketServlet {
} }
super.send(("{'type':'user_list','users':[" + sb + "]}").replace('\'', '"')); super.send(("{'type':'user_list','users':[" + sb + "]}").replace('\'', '"'));
String msg = ("{'type':'discover_user','user':{'userid':'" + this.getSessionid() + "','username':'" + users.get(this.getSessionid()) + "'}}").replace('\'', '"'); String msg = ("{'type':'discover_user','user':{'userid':'" + this.getSessionid() + "','username':'" + users.get(this.getSessionid()) + "'}}").replace('\'', '"');
super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> { super.broadcastMessage(msg);
x.send(msg);
});
} }
} }
@Override @Override
public void onMessage(Object text, boolean last) { public void onMessage(Object text, boolean last) {
//System.out.println("接收到消息: " + text); //System.out.println("接收到消息: " + text);
super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> { super.broadcastMessage(text, last);
x.send(text);
});
} }
@Override @Override
public void onClose(int code, String reason) { public void onClose(int code, String reason) {
sessions.remove(this.getSessionid()); sessions.remove(this.getSessionid());
String msg = ("{'type':'remove_user','user':{'userid':'" + this.getSessionid() + "','username':'" + users.get(this.getSessionid()) + "'}}").replace('\'', '"'); String msg = ("{'type':'remove_user','user':{'userid':'" + this.getSessionid() + "','username':'" + users.get(this.getSessionid()) + "'}}").replace('\'', '"');
super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> { super.broadcastMessage(msg);
x.send(msg);
});
} }
@Override @Override
protected CompletableFuture<Serializable> createGroupid() { protected CompletableFuture<Serializable> createUserid() {
return CompletableFuture.completedFuture("2"); return CompletableFuture.completedFuture("2");
} }
}; };

View File

@@ -21,7 +21,7 @@ public class ChatService implements Service {
protected final AtomicInteger idcreator = new AtomicInteger(10000); protected final AtomicInteger idcreator = new AtomicInteger(10000);
public int createGroupid() { public int createUserid() {
int v = idcreator.incrementAndGet(); int v = idcreator.incrementAndGet();
setIdcreator(v); setIdcreator(v);
return v; return v;

View File

@@ -24,21 +24,21 @@ public class ChatWebSocket extends WebSocket<Integer, Object> {
protected ChatService service; protected ChatService service;
@Override @Override
protected CompletableFuture<Integer> createGroupid() { protected CompletableFuture<Integer> createUserid() {
return CompletableFuture.completedFuture(service.createGroupid()); return CompletableFuture.completedFuture(service.createUserid());
} }
@RestOnMessage(name = "sendmessage") @RestOnMessage(name = "sendmessage")
public void onChatMessage(ChatMessage message, Map<String, String> extmap) { public void onChatMessage(ChatMessage message, Map<String, String> extmap) {
message.fromuserid = getGroupid(); message.fromuserid = userid();
message.fromusername = "用户" + getGroupid(); message.fromusername = "用户" + userid();
System.out.println("获取消息: message: " + message + ", map: " + extmap); System.out.println("获取消息: message: " + message + ", map: " + extmap);
super.broadcastEachMessage(message); super.broadcastMessage(message);
} }
@RestOnMessage(name = "joinroom") @RestOnMessage(name = "joinroom")
public void onJoinRoom(int roomid) { public void onJoinRoom(int roomid) {
service.joinRoom(getGroupid(), roomid); service.joinRoom(userid(), roomid);
System.out.println("加入房间: roomid: " + roomid); System.out.println("加入房间: roomid: " + roomid);
} }