diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 2a7087b13..89c904394 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -15,21 +15,21 @@ import org.redkale.net.*; /** * 一个WebSocket连接对应一个WebSocket实体,即一个WebSocket会绑定一个TCP连接。 * WebSocket 有两种模式: - * 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下: - * 1.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。 - * 1.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。 - * 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。 - * 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。 - * 1.5 onClose WebSocket被关闭后回调此方法。 + * 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下: + * 1.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。 + * 1.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。 + * 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。 + * 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。 + * 1.5 onClose WebSocket被关闭后回调此方法。 * - * 此模式下 以上方法都应该被重载。 + * 此模式下 以上方法都应该被重载。 * - * 2) 原始二进制模式: 此模式有别于HTML5规范,可以视为原始的TCP连接。通常用于音频视频通讯场景。期流程顺序如下: - * 2.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。 - * 2.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。 - * 2.3 onRead WebSocket成功连接后回调此方法, 由此方法处理原始的TCP连接, 同时业务代码去控制WebSocket的关闭。 + * 2) 原始二进制模式: 此模式有别于HTML5规范,可以视为原始的TCP连接。通常用于音频视频通讯场景。期流程顺序如下: + * 2.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。 + * 2.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。 + * 2.3 onRead WebSocket成功连接后回调此方法, 由此方法处理原始的TCP连接, 同时业务代码去控制WebSocket的关闭。 * - * 此模式下 以上方法都应该被重载。 + * 此模式下 以上方法都应该被重载。 *

* * @see http://www.redkale.org @@ -67,6 +67,10 @@ public abstract class WebSocket { Serializable _groupid; //不可能为空 + SocketAddress _remoteAddress;//不可能为空 + + String _remoteAddr;//不可能为空 + private final long createtime = System.currentTimeMillis(); private final Map attributes = new ConcurrentHashMap<>(); @@ -260,11 +264,27 @@ public abstract class WebSocket { if (_engine.node == null) return RETCODE_NODESERVICE_NULL; return _engine.node.sendMessage(groupid, recent, data, last); } - + + /** + * 获取在线用户的节点地址列表 + * + * @param groupid + * @return + */ protected final Collection getOnlineNodes(Serializable groupid) { return _engine.node.getOnlineNodes(groupid); } - + + /** + * 获取在线用户的详细连接信息 + * + * @param groupid + * @return + */ + protected final Map> getOnlineRemoteAddress(Serializable groupid) { + return _engine.node.getOnlineRemoteAddress(groupid); + } + /** * 获取当前WebSocket下的属性 *

@@ -316,6 +336,24 @@ public abstract class WebSocket { return _sessionid; } + /** + * 获取客户端直接地址, 当WebSocket连接是由代理服务器转发的,则该值固定为代理服务器的IP地址 + * + * @return + */ + public final SocketAddress getRemoteAddress() { + return _remoteAddress; + } + + /** + * 获取客户端真实地址 + * + * @return + */ + public final String getRemoteAddr() { + return _remoteAddr; + } + //------------------------------------------------------------------- /** * 获取当前WebSocket所属的WebSocketGroup, 不会为null diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 3a207be3a..70253a90a 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -54,7 +54,8 @@ public abstract class WebSocketNode { }); }); } - + protected abstract List getOnlineRemoteAddresses(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid); + protected abstract int sendMessage(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Serializable message, boolean last); protected abstract void connect(Serializable groupid, InetSocketAddress addr); @@ -63,15 +64,33 @@ public abstract class WebSocketNode { //-------------------------------------------------------------------------------- /** - * 获取在线的节点地址列表 + * 获取在线用户的节点地址列表 * * @param groupid * @return */ - public Collection getOnlineNodes(Serializable groupid) { + public Collection getOnlineNodes(final Serializable groupid) { return source.getCollection(groupid); } - + + /** + * 获取在线用户的详细连接信息 + * + * @param groupid + * @return + */ + public Map> getOnlineRemoteAddress(final Serializable groupid) { + Collection nodes = getOnlineNodes(groupid); + if(nodes == null) return null; + final Map> map = new HashMap(); + for(InetSocketAddress nodeAddress : nodes) { + List list = getOnlineRemoteAddresses(nodeAddress, groupid); + if(list == null) list = new ArrayList(); + map.put(nodeAddress, list); + } + return map; + } + public final void connect(Serializable groupid, String engineid) { if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ")."); Set engineids = localNodes.get(groupid); diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index ea3bb1ce3..c8c30d769 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -98,6 +98,8 @@ public abstract class WebSocketServlet extends HttpServlet { } final WebSocket webSocket = this.createWebSocket(); webSocket._engine = engine; + webSocket._remoteAddress = request.getRemoteAddress(); + webSocket._remoteAddr = request.getRemoteAddr(); Serializable sessionid = webSocket.onOpen(request); if (sessionid == null) { if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request); diff --git a/src/org/redkale/service/CacheSourceService.java b/src/org/redkale/service/CacheSourceService.java index 23a7f0d1e..fab58a24d 100644 --- a/src/org/redkale/service/CacheSourceService.java +++ b/src/org/redkale/service/CacheSourceService.java @@ -431,7 +431,7 @@ public class CacheSourceService implem } @ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "value"}) - protected CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, K key, T value) { + public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, K key, T value) { this.cacheType = cacheType; this.expireSeconds = expireSeconds; this.lastAccessed = lastAccessed; diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 77b027f04..bbb340db2 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -30,6 +30,21 @@ public class WebSocketNodeService extends WebSocketNode implements Service { super.destroy(conf); } + @Override + public List getOnlineRemoteAddresses(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid) { + if (localSncpAddress != null && !localSncpAddress.equals(targetAddress)) return null; + final Set engineids = localNodes.get(groupid); + if (engineids == null || engineids.isEmpty()) return null; + final List rs = new ArrayList<>(); + for (String engineid : engineids) { + final WebSocketEngine engine = engines.get(engineid); + if (engine == null) continue; + final WebSocketGroup group = engine.getWebSocketGroup(groupid); + group.getWebSockets().forEach(x -> rs.add(x.getRemoteAddr())); + } + return rs; + } + @Override public int sendMessage(@DynTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Serializable message, boolean last) { final Set engineids = localNodes.get(groupid);