From 1ac5f060a4fc3c36d1c12acb60f38d787ab1e63c Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 11 Sep 2018 09:13:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0WebSocketAction=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocket.java | 41 +++++++++ src/org/redkale/net/http/WebSocketAction.java | 58 ++++++++++++ src/org/redkale/net/http/WebSocketEngine.java | 48 ++++++++++ src/org/redkale/net/http/WebSocketNode.java | 88 +++++++++++++++++++ .../redkale/service/WebSocketNodeService.java | 12 +++ 5 files changed, 247 insertions(+) create mode 100644 src/org/redkale/net/http/WebSocketAction.java diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 375878d01..5ef949748 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -475,6 +475,35 @@ public abstract class WebSocket { return rs; } + /** + * 给指定userid的WebSocket节点发送操作 + * + * @param action 操作参数 + * @param userids Serializable[] + * + * @return 为0表示成功, 其他值表示异常 + */ + public final CompletableFuture sendAction(final WebSocketAction action, Serializable... userids) { + if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + CompletableFuture rs = _engine.node.sendAction(action, userids); + if (_engine.logger.isLoggable(Level.FINEST)) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket action(" + action + ")"); + return rs; + } + + /** + * 广播操作, 给所有人发操作指令 + * + * @param action 操作参数 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastAction(final WebSocketAction action) { + if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + CompletableFuture rs = _engine.node.broadcastAction(action); + if (_engine.logger.isLoggable(Level.FINEST)) _engine.logger.finest("broadcast send websocket action(" + action + ")"); + return rs; + } + /** * 获取用户在线的SNCP节点地址列表,不是分布式则返回元素数量为1,且元素值为null的列表
* InetSocketAddress 为 SNCP节点地址 @@ -683,6 +712,18 @@ public abstract class WebSocket { return true; } + /** + * WebSocket.broadcastAction时的操作 + * + * @param action 操作参数 + * + * @return CompletableFuture + * + */ + protected CompletableFuture action(WebSocketAction action) { + return CompletableFuture.completedFuture(0); + } + /** * WebSokcet连接成功后的回调方法 */ diff --git a/src/org/redkale/net/http/WebSocketAction.java b/src/org/redkale/net/http/WebSocketAction.java new file mode 100644 index 000000000..104d2ba41 --- /dev/null +++ b/src/org/redkale/net/http/WebSocketAction.java @@ -0,0 +1,58 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net.http; + +import java.io.Serializable; +import java.util.Map; +import org.redkale.convert.json.JsonConvert; + +/** + * WebSocket.broadcastAction时的参数 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class WebSocketAction implements Serializable { + + protected String action; + + protected Map attach; + + public WebSocketAction() { + } + + public WebSocketAction(String action) { + this.action = action; + } + + public WebSocketAction(String action, Map attach) { + this.action = action; + this.attach = attach; + } + + public String getAction() { + return action; + } + + public void setAction(String action) { + this.action = action; + } + + public Map getAttach() { + return attach; + } + + public void setAttach(Map attach) { + this.attach = attach; + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } +} diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 8fda5a9b3..d9bdd1926 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -325,6 +325,54 @@ public class WebSocketEngine { } } + @Comment("给指定WebSocket连接用户发起操作指令") + public CompletableFuture broadcastAction(final WebSocketAction action) { + CompletableFuture future = null; + if (single) { + for (WebSocket websocket : websockets.values()) { + future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b); + } + } else { + for (List list : websockets2.values()) { + for (WebSocket websocket : list) { + future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b); + } + } + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } + + @Comment("给指定用户组发送操作") + public CompletableFuture sendAction(final WebSocketAction action, final Stream userids) { + Object[] array = userids.toArray(); + Serializable[] ss = new Serializable[array.length]; + for (int i = 0; i < array.length; i++) { + ss[i] = (Serializable) array[i]; + } + return sendAction(action, ss); + } + + @Comment("给指定用户组发送操作") + public CompletableFuture sendAction(final WebSocketAction action, final Serializable... userids) { + CompletableFuture future = null; + if (single) { + for (Serializable userid : userids) { + WebSocket websocket = websockets.get(userid); + if (websocket == null) continue; + future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b); + } + } else { + for (Serializable userid : userids) { + List list = websockets2.get(userid); + if (list == null) continue; + for (WebSocket websocket : list) { + future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b); + } + } + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } + @Comment("获取最大连接数") public int getLocalWsmaxconns() { return this.wsmaxconns; diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index ddb97b1f6..d587f62c3 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -92,6 +92,10 @@ public abstract class WebSocketNode { protected abstract CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last); + protected abstract CompletableFuture sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable userid); + + protected abstract CompletableFuture broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action); + protected abstract CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr); protected abstract CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr); @@ -559,6 +563,90 @@ public abstract class WebSocketNode { return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } + /** + * 广播操作, 给所有人发操作 + * + * @param action 操作参数 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + @Local + public CompletableFuture broadcastAction(final WebSocketAction action) { + if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + return this.localEngine.broadcastAction(action); + } + CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastAction(action); + tryAcquireSemaphore(); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs); + if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + if (addr == null || addr.equals(localSncpAddress)) continue; + future = future == null ? remoteNode.broadcastAction(addr, action) + : future.thenCombine(remoteNode.broadcastAction(addr, action), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(0) : future; + }); + return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); + } + + /** + * 向指定用户发送操作,先发送本地连接,再发送远程连接
+ * 如果当前WebSocketNode是远程模式,此方法只发送远程连接 + * + * @param action 操作参数 + * @param userids Serializable[] + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + @Local + public CompletableFuture sendAction(final WebSocketAction action, final Serializable... userids) { + if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + return this.localEngine.sendAction(action, userids); + } + CompletableFuture future = null; + for (Serializable userid : userids) { + future = future == null ? sendOneAction(action, userid) : future.thenCombine(sendOneAction(action, userid), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } + + protected CompletableFuture sendOneAction(final WebSocketAction action, final Serializable userid) { + if (logger.isLoggable(Level.FINEST)) { + logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + } + CompletableFuture localFuture = null; + if (this.localEngine != null) localFuture = localEngine.sendAction(action, userid); + if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; + } + //远程节点发送操作 + tryAcquireSemaphore(); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + if (addrs == null || addrs.isEmpty()) { + if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node "); + return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + } + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs); + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + if (addr == null || addr.equals(localSncpAddress)) continue; + future = future == null ? remoteNode.sendAction(addr, action, userid) + : future.thenCombine(remoteNode.sendAction(addr, action, userid), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + }); + return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); + } + protected Object formatRemoteMessage(Object message) { if (message instanceof WebSocketPacket) return message; if (message instanceof byte[]) return message; diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 305772ec7..0659df6c0 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -67,6 +67,18 @@ public class WebSocketNodeService extends WebSocketNode implements Service { return this.localEngine.broadcastMessage(wsrange, message, last); } + @Override + public CompletableFuture sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable userid) { + if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + return this.localEngine.sendAction(action, userid); + } + + @Override + public CompletableFuture broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) { + if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + return this.localEngine.broadcastAction(action); + } + /** * 当用户连接到节点,需要更新到CacheSource *