This commit is contained in:
Redkale
2017-05-13 18:17:36 +08:00
parent e1df150a37
commit bb8462af2a
3 changed files with 183 additions and 163 deletions

View File

@@ -9,7 +9,7 @@ import org.redkale.net.http.WebSocketPacket.FrameType;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.Comment; import org.redkale.util.Comment;
@@ -200,7 +200,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendEachMessage(Serializable groupid, String text) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, String text) {
return sendEachMessage(groupid, text, true); return sendEachMessage(groupid, text, true);
} }
@@ -212,7 +212,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendEachMessage(Serializable groupid, byte[] data) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, byte[] data) {
return sendEachMessage(groupid, data, true); return sendEachMessage(groupid, data, true);
} }
@@ -224,7 +224,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendEachMessage(Serializable groupid, Object message) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message) {
return sendEachMessage(groupid, message, true); return sendEachMessage(groupid, message, true);
} }
@@ -237,7 +237,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendEachMessage(Serializable groupid, String text, boolean last) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, false, text, last); return sendMessage(groupid, false, text, last);
} }
@@ -250,7 +250,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendEachMessage(Serializable groupid, byte[] data, boolean last) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, false, data, last); return sendMessage(groupid, false, data, last);
} }
@@ -263,7 +263,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendEachMessage(Serializable groupid, Object message, boolean last) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, false, message, last); return sendMessage(groupid, false, message, last);
} }
@@ -275,7 +275,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendRecentMessage(Serializable groupid, String text) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, String text) {
return sendRecentMessage(groupid, text, true); return sendRecentMessage(groupid, text, true);
} }
@@ -287,7 +287,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendRecentMessage(Serializable groupid, byte[] data) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, byte[] data) {
return sendRecentMessage(groupid, data, true); return sendRecentMessage(groupid, data, true);
} }
@@ -299,7 +299,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendRecentMessage(Serializable groupid, Object message) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message) {
return sendMessage(groupid, true, message, true); return sendMessage(groupid, true, message, true);
} }
@@ -312,7 +312,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendRecentMessage(Serializable groupid, String text, boolean last) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, true, text, last); return sendMessage(groupid, true, text, last);
} }
@@ -325,7 +325,7 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendRecentMessage(Serializable groupid, byte[] data, boolean last) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, true, data, last); return sendMessage(groupid, true, data, last);
} }
@@ -338,27 +338,27 @@ public abstract class WebSocket {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final int sendRecentMessage(Serializable groupid, Object message, boolean last) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, true, message, last); return sendMessage(groupid, true, message, last);
} }
private int sendMessage(Serializable groupid, boolean recent, String text, boolean last) { private CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, String text, boolean last) {
if (_engine.node == null) return RETCODE_NODESERVICE_NULL; if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
int rs = _engine.node.sendMessage(groupid, recent, text, last); CompletableFuture<Integer> rs = _engine.node.sendMessage(groupid, recent, text, last);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + text + ")"); if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + text + ")");
return rs; return rs;
} }
private int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { private CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) {
if (_engine.node == null) return RETCODE_NODESERVICE_NULL; if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
int rs = _engine.node.sendMessage(groupid, recent, data, last); CompletableFuture<Integer> rs = _engine.node.sendMessage(groupid, recent, data, last);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(byte[" + data.length + "])"); if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(byte[" + data.length + "])");
return rs; return rs;
} }
private int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { private CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message, boolean last) {
if (_engine.node == null) return RETCODE_NODESERVICE_NULL; if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
int rs = _engine.node.sendMessage(groupid, recent, message, last); CompletableFuture<Integer> rs = _engine.node.sendMessage(groupid, recent, message, last);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")");
return rs; return rs;
} }
@@ -370,7 +370,7 @@ public abstract class WebSocket {
* *
* @return 地址列表 * @return 地址列表
*/ */
protected final Collection<InetSocketAddress> getOnlineNodes(Serializable groupid) { protected final CompletableFuture<Collection<InetSocketAddress>> getOnlineNodes(Serializable groupid) {
return _engine.node.getOnlineNodes(groupid); return _engine.node.getOnlineNodes(groupid);
} }
@@ -381,7 +381,7 @@ public abstract class WebSocket {
* *
* @return 地址集合 * @return 地址集合
*/ */
protected final Map<InetSocketAddress, List<String>> getOnlineRemoteAddress(Serializable groupid) { protected final CompletableFuture<Map<InetSocketAddress, List<String>>> getOnlineRemoteAddress(Serializable groupid) {
return _engine.node.getOnlineRemoteAddress(groupid); return _engine.node.getOnlineRemoteAddress(groupid);
} }

View File

@@ -62,56 +62,16 @@ public abstract class WebSocketNode {
}); });
} }
protected abstract List<String> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); protected abstract CompletableFuture<List<String>> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid);
protected abstract int sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last); protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last);
protected abstract void connect(Serializable groupid, InetSocketAddress addr); protected abstract CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress addr);
protected abstract void disconnect(Serializable groupid, InetSocketAddress addr); protected abstract CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress addr);
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
protected List<String> remoteOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { final void connect(Serializable groupid, String engineid) {
if (remoteNode == null) return null;
try {
return remoteNode.getOnlineRemoteAddresses(targetAddress, groupid);
} catch (Exception e) {
logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e);
return null;
}
}
/**
* 获取在线用户的节点地址列表
*
* @param groupid groupid
*
* @return 地址列表
*/
public Collection<InetSocketAddress> getOnlineNodes(final Serializable groupid) {
return sncpAddressNodes == null ? null : sncpAddressNodes.getCollection(groupid);
}
/**
* 获取在线用户的详细连接信息
*
* @param groupid groupid
*
* @return 地址集合
*/
public Map<InetSocketAddress, List<String>> getOnlineRemoteAddress(final Serializable groupid) {
Collection<InetSocketAddress> nodes = getOnlineNodes(groupid);
if (nodes == null) return null;
final Map<InetSocketAddress, List<String>> map = new HashMap();
for (InetSocketAddress nodeAddress : nodes) {
List<String> list = getOnlineRemoteAddresses(nodeAddress, groupid);
if (list == null) list = new ArrayList();
map.put(nodeAddress, list);
}
return map;
}
final void connect(Serializable groupid, String engineid) {
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ")."); if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ").");
Set<String> engineids = localEngines.get(groupid); Set<String> engineids = localEngines.get(groupid);
if (engineids == null) { if (engineids == null) {
@@ -137,126 +97,179 @@ public abstract class WebSocketNode {
engines.put(engine.getEngineid(), engine); engines.put(engine.getEngineid(), engine);
} }
public final int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { //--------------------------------------------------------------------------------
final Set<String> engineids = localEngines.get(groupid); protected CompletableFuture<List<String>> remoteOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) {
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids); if (remoteNode == null) return CompletableFuture.completedFuture(null);
int rscode = RETCODE_GROUP_EMPTY; try {
if (engineids != null && !engineids.isEmpty()) { return remoteNode.getOnlineRemoteAddresses(targetAddress, groupid);
for (String engineid : engineids) { } catch (Exception e) {
final WebSocketEngine engine = engines.get(engineid); logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e);
if (engine != null) { //在本地 return CompletableFuture.completedFuture(null);
final WebSocketGroup group = engine.getWebSocketGroup(groupid); }
if (group == null || group.isEmpty()) { }
engineids.remove(engineid);
if (finest) logger.finest("websocket want send message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} but websocket group is empty "); /**
rscode = RETCODE_GROUP_EMPTY; * 获取在线用户的节点地址列表
break; *
* @param groupid groupid
*
* @return 地址列表
*/
public CompletableFuture<Collection<InetSocketAddress>> getOnlineNodes(final Serializable groupid) {
return sncpAddressNodes == null ? CompletableFuture.completedFuture(null) : sncpAddressNodes.getCollectionAsync(groupid);
}
/**
* 获取在线用户的详细连接信息
*
* @param groupid groupid
*
* @return 地址集合
*/
//异步待优化
public CompletableFuture<Map<InetSocketAddress, List<String>>> getOnlineRemoteAddress(final Serializable groupid) {
final CompletableFuture<Map<InetSocketAddress, List<String>>> rs = new CompletableFuture<>();
CompletableFuture< Collection<InetSocketAddress>> nodesFuture = getOnlineNodes(groupid);
if (nodesFuture == null) return CompletableFuture.completedFuture(null);
nodesFuture.whenComplete((nodes, e) -> {
if (e != null) {
rs.completeExceptionally(e);
} else {
final Map<InetSocketAddress, List<String>> map = new HashMap();
for (final InetSocketAddress nodeAddress : nodes) {
List<String> list = getOnlineRemoteAddresses(nodeAddress, groupid).join();
if (list == null) list = new ArrayList();
map.put(nodeAddress, list);
}
rs.complete(map);
}
});
return rs;
}
//异步待优化
public final CompletableFuture<Integer> sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) {
return CompletableFuture.supplyAsync(() -> {
final Set<String> engineids = localEngines.get(groupid);
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids);
int rscode = RETCODE_GROUP_EMPTY;
if (engineids != null && !engineids.isEmpty()) {
for (String engineid : engineids) {
final WebSocketEngine engine = engines.get(engineid);
if (engine != null) { //在本地
final WebSocketGroup group = engine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
engineids.remove(engineid);
if (finest) logger.finest("websocket want send message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} but websocket group is empty ");
rscode = RETCODE_GROUP_EMPTY;
break;
}
rscode = group.send(recent, message, last);
} }
rscode = group.send(recent, message, last);
} }
} }
} if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) {
if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { if (finest) {
if (finest) { if ((recent && rscode == 0)) {
if ((recent && rscode == 0)) { logger.finest("websocket want send recent message success");
logger.finest("websocket want send recent message success"); } else {
} else { logger.finest("websocket remote node is null");
logger.finest("websocket remote node is null"); }
} }
return rscode;
}
//-----------------------发送远程的-----------------------------
Collection<InetSocketAddress> addrs = sncpAddressNodes.getCollection(groupid);
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs);
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点)所以正常情况下addrs不会为空。
if (recent) {
InetSocketAddress one = null;
for (InetSocketAddress addr : addrs) {
one = addr;
}
rscode = remoteNode.sendMessage(one, groupid, recent, message, last).join();
} else {
for (InetSocketAddress addr : addrs) {
if (!addr.equals(localSncpAddress)) {
rscode |= remoteNode.sendMessage(addr, groupid, recent, message, last).join();
}
}
}
} else {
rscode = RETCODE_GROUP_EMPTY;
} }
return rscode; return rscode;
} });
//-----------------------发送远程的-----------------------------
Collection<InetSocketAddress> addrs = sncpAddressNodes.getCollection(groupid);
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs);
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点)所以正常情况下addrs不会为空。
if (recent) {
InetSocketAddress one = null;
for (InetSocketAddress addr : addrs) {
one = addr;
}
rscode = remoteNode.sendMessage(one, groupid, recent, message, last);
} else {
for (InetSocketAddress addr : addrs) {
if (!addr.equals(localSncpAddress)) {
rscode |= remoteNode.sendMessage(addr, groupid, recent, message, last);
}
}
}
} else {
rscode = RETCODE_GROUP_EMPTY;
}
return rscode;
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
public final int sendEachMessage(Serializable groupid, String text) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, String text) {
return sendMessage(groupid, false, (Object) text, true); return sendMessage(groupid, false, (Object) text, true);
} }
public final int sendEachMessage(Serializable groupid, String text, boolean last) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, false, (Object) text, last); return sendMessage(groupid, false, (Object) text, last);
} }
public final int sendRecentMessage(Serializable groupid, String text) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, String text) {
return sendMessage(groupid, true, (Object) text, true); return sendMessage(groupid, true, (Object) text, true);
} }
public final int sendRecentMessage(Serializable groupid, String text, boolean last) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, true, (Object) text, last); return sendMessage(groupid, true, (Object) text, last);
} }
public final int sendMessage(Serializable groupid, boolean recent, String text) { public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, String text) {
return sendMessage(groupid, recent, (Object) text, true); return sendMessage(groupid, recent, (Object) text, true);
} }
public final int sendMessage(Serializable groupid, boolean recent, String text, boolean last) { public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, String text, boolean last) {
return sendMessage(groupid, recent, (Object) text, last); return sendMessage(groupid, recent, (Object) text, last);
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
public final int sendEachMessage(Serializable groupid, byte[] data) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, false, (Object) data, true); return sendMessage(groupid, false, (Object) data, true);
} }
public final int sendEachMessage(Serializable groupid, byte[] data, boolean last) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, false, (Object) data, last); return sendMessage(groupid, false, (Object) data, last);
} }
public final int sendRecentMessage(Serializable groupid, byte[] data) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, true, (Object) data, true); return sendMessage(groupid, true, (Object) data, true);
} }
public final int sendRecentMessage(Serializable groupid, byte[] data, boolean last) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, true, (Object) data, last); return sendMessage(groupid, true, (Object) data, last);
} }
public final int sendMessage(Serializable groupid, boolean recent, byte[] data) { public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, byte[] data) {
return sendMessage(groupid, recent, data, true); return sendMessage(groupid, recent, data, true);
} }
public final int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) {
return sendMessage(groupid, recent, (Object) data, last); return sendMessage(groupid, recent, (Object) data, last);
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
public final int sendEachMessage(Serializable groupid, Object message) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message) {
return sendMessage(groupid, false, message, true); return sendMessage(groupid, false, message, true);
} }
public final int sendEachMessage(Serializable groupid, Object message, boolean last) { public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, false, message, last); return sendMessage(groupid, false, message, last);
} }
public final int sendRecentMessage(Serializable groupid, Object message) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message) {
return sendMessage(groupid, true, message, true); return sendMessage(groupid, true, message, true);
} }
public final int sendRecentMessage(Serializable groupid, Object message, boolean last) { public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, true, message, last); return sendMessage(groupid, true, message, last);
} }
public final int sendMessage(Serializable groupid, boolean recent, Object message) { public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message) {
return sendMessage(groupid, recent, message, true); return sendMessage(groupid, recent, message, true);
} }

View File

@@ -9,6 +9,7 @@ import static org.redkale.net.http.WebSocket.*;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -34,49 +35,55 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
} }
@Override @Override
public List<String> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { public CompletableFuture<List<String>> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid); if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid);
final Set<String> engineids = localEngines.get(groupid); return CompletableFuture.supplyAsync(() -> {
if (engineids == null || engineids.isEmpty()) return null; final Set<String> engineids = localEngines.get(groupid);
final List<String> rs = new ArrayList<>(); if (engineids == null || engineids.isEmpty()) return null;
for (String engineid : engineids) { final List<String> rs = new ArrayList<>();
final WebSocketEngine engine = engines.get(engineid); for (String engineid : engineids) {
if (engine == null) continue; final WebSocketEngine engine = engines.get(engineid);
final WebSocketGroup group = engine.getWebSocketGroup(groupid); if (engine == null) continue;
group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr()));
}
return rs;
}
@Override
public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
final Set<String> engineids = localEngines.get(groupid);
if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY;
int code = RETCODE_GROUP_EMPTY;
for (String engineid : engineids) {
final WebSocketEngine engine = engines.get(engineid);
if (engine != null) { //在本地
final WebSocketGroup group = engine.getWebSocketGroup(groupid); final WebSocketGroup group = engine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) { group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr()));
if (finest) logger.finest("receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY;
}
code = group.send(recent, message, last);
if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code);
} }
} return rs;
return code; });
} }
@Override @Override
public void connect(Serializable groupid, InetSocketAddress addr) { public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
sncpAddressNodes.appendSetItem(groupid, addr); return CompletableFuture.supplyAsync(() -> {
final Set<String> engineids = localEngines.get(groupid);
if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY;
int code = RETCODE_GROUP_EMPTY;
for (String engineid : engineids) {
final WebSocketEngine engine = engines.get(engineid);
if (engine != null) { //在本地
final WebSocketGroup group = engine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY;
}
code = group.send(recent, message, last);
if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code);
}
}
return code;
});
}
@Override
public CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress addr) {
CompletableFuture<Void> future = sncpAddressNodes.appendSetItemAsync(groupid, addr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr);
return future;
} }
@Override @Override
public void disconnect(Serializable groupid, InetSocketAddress addr) { public CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress addr) {
sncpAddressNodes.removeSetItem(groupid, addr); CompletableFuture<Void> future = sncpAddressNodes.removeSetItemAsync(groupid, addr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr);
return future;
} }
} }