From bb8462af2a7c1a83dff50cc6edb11ce18d16d3a2 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 13 May 2017 18:17:36 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocket.java | 48 ++-- src/org/redkale/net/http/WebSocketNode.java | 225 +++++++++--------- .../redkale/service/WebSocketNodeService.java | 73 +++--- 3 files changed, 183 insertions(+), 163 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 3dfaeb50b..7a65fd055 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -9,7 +9,7 @@ import org.redkale.net.http.WebSocketPacket.FrameType; import java.io.*; import java.net.*; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; import org.redkale.util.Comment; @@ -200,7 +200,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendEachMessage(Serializable groupid, String text) { + public final CompletableFuture sendEachMessage(Serializable groupid, String text) { return sendEachMessage(groupid, text, true); } @@ -212,7 +212,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendEachMessage(Serializable groupid, byte[] data) { + public final CompletableFuture sendEachMessage(Serializable groupid, byte[] data) { return sendEachMessage(groupid, data, true); } @@ -224,7 +224,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendEachMessage(Serializable groupid, Object message) { + public final CompletableFuture sendEachMessage(Serializable groupid, Object message) { return sendEachMessage(groupid, message, true); } @@ -237,7 +237,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendEachMessage(Serializable groupid, String text, boolean last) { + public final CompletableFuture sendEachMessage(Serializable groupid, String text, boolean last) { return sendMessage(groupid, false, text, last); } @@ -250,7 +250,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendEachMessage(Serializable groupid, byte[] data, boolean last) { + public final CompletableFuture sendEachMessage(Serializable groupid, byte[] data, boolean last) { return sendMessage(groupid, false, data, last); } @@ -263,7 +263,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendEachMessage(Serializable groupid, Object message, boolean last) { + public final CompletableFuture sendEachMessage(Serializable groupid, Object message, boolean last) { return sendMessage(groupid, false, message, last); } @@ -275,7 +275,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendRecentMessage(Serializable groupid, String text) { + public final CompletableFuture sendRecentMessage(Serializable groupid, String text) { return sendRecentMessage(groupid, text, true); } @@ -287,7 +287,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendRecentMessage(Serializable groupid, byte[] data) { + public final CompletableFuture sendRecentMessage(Serializable groupid, byte[] data) { return sendRecentMessage(groupid, data, true); } @@ -299,7 +299,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendRecentMessage(Serializable groupid, Object message) { + public final CompletableFuture sendRecentMessage(Serializable groupid, Object message) { return sendMessage(groupid, true, message, true); } @@ -312,7 +312,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendRecentMessage(Serializable groupid, String text, boolean last) { + public final CompletableFuture sendRecentMessage(Serializable groupid, String text, boolean last) { return sendMessage(groupid, true, text, last); } @@ -325,7 +325,7 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendRecentMessage(Serializable groupid, byte[] data, boolean last) { + public final CompletableFuture sendRecentMessage(Serializable groupid, byte[] data, boolean last) { return sendMessage(groupid, true, data, last); } @@ -338,27 +338,27 @@ public abstract class WebSocket { * * @return 为0表示成功, 其他值表示异常 */ - public final int sendRecentMessage(Serializable groupid, Object message, boolean last) { + public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, boolean last) { return sendMessage(groupid, true, message, last); } - private int sendMessage(Serializable groupid, boolean recent, String text, boolean last) { - if (_engine.node == null) return RETCODE_NODESERVICE_NULL; - int rs = _engine.node.sendMessage(groupid, recent, text, last); + private CompletableFuture sendMessage(Serializable groupid, boolean recent, String text, boolean last) { + if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + CompletableFuture rs = _engine.node.sendMessage(groupid, recent, text, last); if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + text + ")"); return rs; } - private int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { - if (_engine.node == null) return RETCODE_NODESERVICE_NULL; - int rs = _engine.node.sendMessage(groupid, recent, data, last); + private CompletableFuture sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { + if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + CompletableFuture rs = _engine.node.sendMessage(groupid, recent, data, last); if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(byte[" + data.length + "])"); return rs; } - private int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { - if (_engine.node == null) return RETCODE_NODESERVICE_NULL; - int rs = _engine.node.sendMessage(groupid, recent, message, last); + private CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { + if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + CompletableFuture rs = _engine.node.sendMessage(groupid, recent, message, last); if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); return rs; } @@ -370,7 +370,7 @@ public abstract class WebSocket { * * @return 地址列表 */ - protected final Collection getOnlineNodes(Serializable groupid) { + protected final CompletableFuture> getOnlineNodes(Serializable groupid) { return _engine.node.getOnlineNodes(groupid); } @@ -381,7 +381,7 @@ public abstract class WebSocket { * * @return 地址集合 */ - protected final Map> getOnlineRemoteAddress(Serializable groupid) { + protected final CompletableFuture>> getOnlineRemoteAddress(Serializable groupid) { return _engine.node.getOnlineRemoteAddress(groupid); } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index a168501c6..00e40ea02 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -62,56 +62,16 @@ public abstract class WebSocketNode { }); } - protected abstract List getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); + protected abstract CompletableFuture> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); - protected abstract int sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last); + protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last); - protected abstract void connect(Serializable groupid, InetSocketAddress addr); + protected abstract CompletableFuture connect(Serializable groupid, InetSocketAddress addr); - protected abstract void disconnect(Serializable groupid, InetSocketAddress addr); + protected abstract CompletableFuture disconnect(Serializable groupid, InetSocketAddress addr); //-------------------------------------------------------------------------------- - protected List remoteOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { - if (remoteNode == null) return null; - try { - return remoteNode.getOnlineRemoteAddresses(targetAddress, groupid); - } catch (Exception e) { - logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e); - return null; - } - } - - /** - * 获取在线用户的节点地址列表 - * - * @param groupid groupid - * - * @return 地址列表 - */ - public Collection getOnlineNodes(final Serializable groupid) { - return sncpAddressNodes == null ? null : sncpAddressNodes.getCollection(groupid); - } - - /** - * 获取在线用户的详细连接信息 - * - * @param groupid groupid - * - * @return 地址集合 - */ - public Map> getOnlineRemoteAddress(final Serializable groupid) { - Collection nodes = getOnlineNodes(groupid); - if (nodes == null) return null; - final Map> map = new HashMap(); - for (InetSocketAddress nodeAddress : nodes) { - List list = getOnlineRemoteAddresses(nodeAddress, groupid); - if (list == null) list = new ArrayList(); - map.put(nodeAddress, list); - } - return map; - } - - final void connect(Serializable groupid, String engineid) { + final void connect(Serializable groupid, String engineid) { if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ")."); Set engineids = localEngines.get(groupid); if (engineids == null) { @@ -137,126 +97,179 @@ public abstract class WebSocketNode { engines.put(engine.getEngineid(), engine); } - public final int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { - final Set engineids = localEngines.get(groupid); - if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids); - int rscode = RETCODE_GROUP_EMPTY; - if (engineids != null && !engineids.isEmpty()) { - for (String engineid : engineids) { - final WebSocketEngine engine = engines.get(engineid); - if (engine != null) { //在本地 - final WebSocketGroup group = engine.getWebSocketGroup(groupid); - if (group == null || group.isEmpty()) { - engineids.remove(engineid); - if (finest) logger.finest("websocket want send message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} but websocket group is empty "); - rscode = RETCODE_GROUP_EMPTY; - break; + //-------------------------------------------------------------------------------- + protected CompletableFuture> remoteOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { + if (remoteNode == null) return CompletableFuture.completedFuture(null); + try { + return remoteNode.getOnlineRemoteAddresses(targetAddress, groupid); + } catch (Exception e) { + logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e); + return CompletableFuture.completedFuture(null); + } + } + + /** + * 获取在线用户的节点地址列表 + * + * @param groupid groupid + * + * @return 地址列表 + */ + public CompletableFuture> getOnlineNodes(final Serializable groupid) { + return sncpAddressNodes == null ? CompletableFuture.completedFuture(null) : sncpAddressNodes.getCollectionAsync(groupid); + } + + /** + * 获取在线用户的详细连接信息 + * + * @param groupid groupid + * + * @return 地址集合 + */ + //异步待优化 + public CompletableFuture>> getOnlineRemoteAddress(final Serializable groupid) { + final CompletableFuture>> rs = new CompletableFuture<>(); + CompletableFuture< Collection> nodesFuture = getOnlineNodes(groupid); + if (nodesFuture == null) return CompletableFuture.completedFuture(null); + nodesFuture.whenComplete((nodes, e) -> { + if (e != null) { + rs.completeExceptionally(e); + } else { + final Map> map = new HashMap(); + for (final InetSocketAddress nodeAddress : nodes) { + List list = getOnlineRemoteAddresses(nodeAddress, groupid).join(); + if (list == null) list = new ArrayList(); + map.put(nodeAddress, list); + } + rs.complete(map); + } + }); + return rs; + } + + //异步待优化 + public final CompletableFuture sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) { + return CompletableFuture.supplyAsync(() -> { + final Set engineids = localEngines.get(groupid); + if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids); + int rscode = RETCODE_GROUP_EMPTY; + if (engineids != null && !engineids.isEmpty()) { + for (String engineid : engineids) { + final WebSocketEngine engine = engines.get(engineid); + if (engine != null) { //在本地 + final WebSocketGroup group = engine.getWebSocketGroup(groupid); + if (group == null || group.isEmpty()) { + engineids.remove(engineid); + if (finest) logger.finest("websocket want send message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} but websocket group is empty "); + rscode = RETCODE_GROUP_EMPTY; + break; + } + rscode = group.send(recent, message, last); } - rscode = group.send(recent, message, last); } } - } - if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { - if (finest) { - if ((recent && rscode == 0)) { - logger.finest("websocket want send recent message success"); - } else { - logger.finest("websocket remote node is null"); + if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { + if (finest) { + if ((recent && rscode == 0)) { + logger.finest("websocket want send recent message success"); + } else { + logger.finest("websocket remote node is null"); + } } + return rscode; + } + //-----------------------发送远程的----------------------------- + Collection addrs = sncpAddressNodes.getCollection(groupid); + if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); + if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点),所以正常情况下addrs不会为空。 + if (recent) { + InetSocketAddress one = null; + for (InetSocketAddress addr : addrs) { + one = addr; + } + rscode = remoteNode.sendMessage(one, groupid, recent, message, last).join(); + } else { + for (InetSocketAddress addr : addrs) { + if (!addr.equals(localSncpAddress)) { + rscode |= remoteNode.sendMessage(addr, groupid, recent, message, last).join(); + } + } + } + } else { + rscode = RETCODE_GROUP_EMPTY; } return rscode; - } - //-----------------------发送远程的----------------------------- - Collection addrs = sncpAddressNodes.getCollection(groupid); - if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); - if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点),所以正常情况下addrs不会为空。 - if (recent) { - InetSocketAddress one = null; - for (InetSocketAddress addr : addrs) { - one = addr; - } - rscode = remoteNode.sendMessage(one, groupid, recent, message, last); - } else { - for (InetSocketAddress addr : addrs) { - if (!addr.equals(localSncpAddress)) { - rscode |= remoteNode.sendMessage(addr, groupid, recent, message, last); - } - } - } - } else { - rscode = RETCODE_GROUP_EMPTY; - } - return rscode; + }); } //-------------------------------------------------------------------------------- - public final int sendEachMessage(Serializable groupid, String text) { + public final CompletableFuture sendEachMessage(Serializable groupid, String text) { return sendMessage(groupid, false, (Object) text, true); } - public final int sendEachMessage(Serializable groupid, String text, boolean last) { + public final CompletableFuture sendEachMessage(Serializable groupid, String text, boolean last) { return sendMessage(groupid, false, (Object) text, last); } - public final int sendRecentMessage(Serializable groupid, String text) { + public final CompletableFuture sendRecentMessage(Serializable groupid, String text) { return sendMessage(groupid, true, (Object) text, true); } - public final int sendRecentMessage(Serializable groupid, String text, boolean last) { + public final CompletableFuture sendRecentMessage(Serializable groupid, String text, boolean last) { return sendMessage(groupid, true, (Object) text, last); } - public final int sendMessage(Serializable groupid, boolean recent, String text) { + public final CompletableFuture sendMessage(Serializable groupid, boolean recent, String text) { return sendMessage(groupid, recent, (Object) text, true); } - public final int sendMessage(Serializable groupid, boolean recent, String text, boolean last) { + public final CompletableFuture sendMessage(Serializable groupid, boolean recent, String text, boolean last) { return sendMessage(groupid, recent, (Object) text, last); } //-------------------------------------------------------------------------------- - public final int sendEachMessage(Serializable groupid, byte[] data) { + public final CompletableFuture sendEachMessage(Serializable groupid, byte[] data) { return sendMessage(groupid, false, (Object) data, true); } - public final int sendEachMessage(Serializable groupid, byte[] data, boolean last) { + public final CompletableFuture sendEachMessage(Serializable groupid, byte[] data, boolean last) { return sendMessage(groupid, false, (Object) data, last); } - public final int sendRecentMessage(Serializable groupid, byte[] data) { + public final CompletableFuture sendRecentMessage(Serializable groupid, byte[] data) { return sendMessage(groupid, true, (Object) data, true); } - public final int sendRecentMessage(Serializable groupid, byte[] data, boolean last) { + public final CompletableFuture sendRecentMessage(Serializable groupid, byte[] data, boolean last) { return sendMessage(groupid, true, (Object) data, last); } - public final int sendMessage(Serializable groupid, boolean recent, byte[] data) { + public final CompletableFuture sendMessage(Serializable groupid, boolean recent, byte[] data) { return sendMessage(groupid, recent, data, true); } - public final int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { + public final CompletableFuture sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { return sendMessage(groupid, recent, (Object) data, last); } //-------------------------------------------------------------------------------- - public final int sendEachMessage(Serializable groupid, Object message) { + public final CompletableFuture sendEachMessage(Serializable groupid, Object message) { return sendMessage(groupid, false, message, true); } - public final int sendEachMessage(Serializable groupid, Object message, boolean last) { + public final CompletableFuture sendEachMessage(Serializable groupid, Object message, boolean last) { return sendMessage(groupid, false, message, last); } - public final int sendRecentMessage(Serializable groupid, Object message) { + public final CompletableFuture sendRecentMessage(Serializable groupid, Object message) { return sendMessage(groupid, true, message, true); } - public final int sendRecentMessage(Serializable groupid, Object message, boolean last) { + public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, boolean last) { return sendMessage(groupid, true, message, last); } - public final int sendMessage(Serializable groupid, boolean recent, Object message) { + public final CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message) { return sendMessage(groupid, recent, message, true); } diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index d90fdda81..6100fd554 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -9,6 +9,7 @@ import static org.redkale.net.http.WebSocket.*; import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.CompletableFuture; import org.redkale.net.http.*; import org.redkale.util.*; @@ -34,49 +35,55 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public List getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { + public CompletableFuture> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid); - final Set engineids = localEngines.get(groupid); - if (engineids == null || engineids.isEmpty()) return null; - final List rs = new ArrayList<>(); - for (String engineid : engineids) { - final WebSocketEngine engine = engines.get(engineid); - if (engine == null) continue; - final WebSocketGroup group = engine.getWebSocketGroup(groupid); - group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr())); - } - return rs; - } - - @Override - public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { - final Set engineids = localEngines.get(groupid); - if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY; - int code = RETCODE_GROUP_EMPTY; - for (String engineid : engineids) { - final WebSocketEngine engine = engines.get(engineid); - if (engine != null) { //在本地 + return CompletableFuture.supplyAsync(() -> { + final Set engineids = localEngines.get(groupid); + if (engineids == null || engineids.isEmpty()) return null; + final List rs = new ArrayList<>(); + for (String engineid : engineids) { + final WebSocketEngine engine = engines.get(engineid); + if (engine == null) continue; final WebSocketGroup group = engine.getWebSocketGroup(groupid); - if (group == null || group.isEmpty()) { - if (finest) logger.finest("receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); - return RETCODE_GROUP_EMPTY; - } - code = group.send(recent, message, last); - if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code); + group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr())); } - } - return code; + return rs; + }); } @Override - public void connect(Serializable groupid, InetSocketAddress addr) { - sncpAddressNodes.appendSetItem(groupid, addr); + public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { + return CompletableFuture.supplyAsync(() -> { + final Set engineids = localEngines.get(groupid); + if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY; + int code = RETCODE_GROUP_EMPTY; + for (String engineid : engineids) { + final WebSocketEngine engine = engines.get(engineid); + if (engine != null) { //在本地 + final WebSocketGroup group = engine.getWebSocketGroup(groupid); + if (group == null || group.isEmpty()) { + if (finest) logger.finest("receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); + return RETCODE_GROUP_EMPTY; + } + code = group.send(recent, message, last); + if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code); + } + } + return code; + }); + } + + @Override + public CompletableFuture connect(Serializable groupid, InetSocketAddress addr) { + CompletableFuture future = sncpAddressNodes.appendSetItemAsync(groupid, addr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr); + return future; } @Override - public void disconnect(Serializable groupid, InetSocketAddress addr) { - sncpAddressNodes.removeSetItem(groupid, addr); + public CompletableFuture disconnect(Serializable groupid, InetSocketAddress addr) { + CompletableFuture future = sncpAddressNodes.removeSetItemAsync(groupid, addr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr); + return future; } }