增加WebSocketAction功能
This commit is contained in:
@@ -475,6 +475,35 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* 给指定userid的WebSocket节点发送操作
|
||||
*
|
||||
* @param action 操作参数
|
||||
* @param userids Serializable[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示异常
|
||||
*/
|
||||
public final CompletableFuture<Integer> sendAction(final WebSocketAction action, Serializable... userids) {
|
||||
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
|
||||
CompletableFuture<Integer> rs = _engine.node.sendAction(action, userids);
|
||||
if (_engine.logger.isLoggable(Level.FINEST)) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket action(" + action + ")");
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* 广播操作, 给所有人发操作指令
|
||||
*
|
||||
* @param action 操作参数
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
public final CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
|
||||
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
|
||||
CompletableFuture<Integer> rs = _engine.node.broadcastAction(action);
|
||||
if (_engine.logger.isLoggable(Level.FINEST)) _engine.logger.finest("broadcast send websocket action(" + action + ")");
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取用户在线的SNCP节点地址列表,不是分布式则返回元素数量为1,且元素值为null的列表<br>
|
||||
* InetSocketAddress 为 SNCP节点地址
|
||||
@@ -683,6 +712,18 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSocket.broadcastAction时的操作
|
||||
*
|
||||
* @param action 操作参数
|
||||
*
|
||||
* @return CompletableFuture
|
||||
*
|
||||
*/
|
||||
protected CompletableFuture<Integer> action(WebSocketAction action) {
|
||||
return CompletableFuture.completedFuture(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSokcet连接成功后的回调方法
|
||||
*/
|
||||
|
||||
58
src/org/redkale/net/http/WebSocketAction.java
Normal file
58
src/org/redkale/net/http/WebSocketAction.java
Normal file
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.net.http;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
|
||||
/**
|
||||
* WebSocket.broadcastAction时的参数
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class WebSocketAction implements Serializable {
|
||||
|
||||
protected String action;
|
||||
|
||||
protected Map<String, String> attach;
|
||||
|
||||
public WebSocketAction() {
|
||||
}
|
||||
|
||||
public WebSocketAction(String action) {
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
public WebSocketAction(String action, Map<String, String> attach) {
|
||||
this.action = action;
|
||||
this.attach = attach;
|
||||
}
|
||||
|
||||
public String getAction() {
|
||||
return action;
|
||||
}
|
||||
|
||||
public void setAction(String action) {
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
public Map<String, String> getAttach() {
|
||||
return attach;
|
||||
}
|
||||
|
||||
public void setAttach(Map<String, String> attach) {
|
||||
this.attach = attach;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JsonConvert.root().convertTo(this);
|
||||
}
|
||||
}
|
||||
@@ -325,6 +325,54 @@ public class WebSocketEngine {
|
||||
}
|
||||
}
|
||||
|
||||
@Comment("给指定WebSocket连接用户发起操作指令")
|
||||
public CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
|
||||
CompletableFuture<Integer> future = null;
|
||||
if (single) {
|
||||
for (WebSocket websocket : websockets.values()) {
|
||||
future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b);
|
||||
}
|
||||
} else {
|
||||
for (List<WebSocket> list : websockets2.values()) {
|
||||
for (WebSocket websocket : list) {
|
||||
future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b);
|
||||
}
|
||||
}
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
}
|
||||
|
||||
@Comment("给指定用户组发送操作")
|
||||
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Stream<? extends Serializable> userids) {
|
||||
Object[] array = userids.toArray();
|
||||
Serializable[] ss = new Serializable[array.length];
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
ss[i] = (Serializable) array[i];
|
||||
}
|
||||
return sendAction(action, ss);
|
||||
}
|
||||
|
||||
@Comment("给指定用户组发送操作")
|
||||
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Serializable... userids) {
|
||||
CompletableFuture<Integer> future = null;
|
||||
if (single) {
|
||||
for (Serializable userid : userids) {
|
||||
WebSocket websocket = websockets.get(userid);
|
||||
if (websocket == null) continue;
|
||||
future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b);
|
||||
}
|
||||
} else {
|
||||
for (Serializable userid : userids) {
|
||||
List<WebSocket> list = websockets2.get(userid);
|
||||
if (list == null) continue;
|
||||
for (WebSocket websocket : list) {
|
||||
future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b);
|
||||
}
|
||||
}
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
}
|
||||
|
||||
@Comment("获取最大连接数")
|
||||
public int getLocalWsmaxconns() {
|
||||
return this.wsmaxconns;
|
||||
|
||||
@@ -92,6 +92,10 @@ public abstract class WebSocketNode {
|
||||
|
||||
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
|
||||
|
||||
protected abstract CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable userid);
|
||||
|
||||
protected abstract CompletableFuture<Integer> broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
|
||||
|
||||
protected abstract CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr);
|
||||
|
||||
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr);
|
||||
@@ -559,6 +563,90 @@ public abstract class WebSocketNode {
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||
}
|
||||
|
||||
/**
|
||||
* 广播操作, 给所有人发操作
|
||||
*
|
||||
* @param action 操作参数
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.broadcastAction(action);
|
||||
}
|
||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastAction(action);
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs);
|
||||
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (InetSocketAddress addr : addrs) {
|
||||
if (addr == null || addr.equals(localSncpAddress)) continue;
|
||||
future = future == null ? remoteNode.broadcastAction(addr, action)
|
||||
: future.thenCombine(remoteNode.broadcastAction(addr, action), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(0) : future;
|
||||
});
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送操作,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param action 操作参数
|
||||
* @param userids Serializable[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Serializable... userids) {
|
||||
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.sendAction(action, userids);
|
||||
}
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (Serializable userid : userids) {
|
||||
future = future == null ? sendOneAction(action, userid) : future.thenCombine(sendOneAction(action, userid), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
}
|
||||
|
||||
protected CompletableFuture<Integer> sendOneAction(final WebSocketAction action, final Serializable userid) {
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
|
||||
}
|
||||
CompletableFuture<Integer> localFuture = null;
|
||||
if (this.localEngine != null) localFuture = localEngine.sendAction(action, userid);
|
||||
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
|
||||
//没有CacheSource就不会有分布式节点
|
||||
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
|
||||
}
|
||||
//远程节点发送操作
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
|
||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
}
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (InetSocketAddress addr : addrs) {
|
||||
if (addr == null || addr.equals(localSncpAddress)) continue;
|
||||
future = future == null ? remoteNode.sendAction(addr, action, userid)
|
||||
: future.thenCombine(remoteNode.sendAction(addr, action, userid), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
});
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||
}
|
||||
|
||||
protected Object formatRemoteMessage(Object message) {
|
||||
if (message instanceof WebSocketPacket) return message;
|
||||
if (message instanceof byte[]) return message;
|
||||
|
||||
@@ -67,6 +67,18 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
return this.localEngine.broadcastMessage(wsrange, message, last);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable userid) {
|
||||
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
return this.localEngine.sendAction(action, userid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
|
||||
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
return this.localEngine.broadcastAction(action);
|
||||
}
|
||||
|
||||
/**
|
||||
* 当用户连接到节点,需要更新到CacheSource
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user