This commit is contained in:
Redkale
2017-05-18 14:22:44 +08:00
parent 39ba0f86f6
commit ba928b389b
4 changed files with 19 additions and 31 deletions

View File

@@ -81,7 +81,9 @@ public abstract class WebSocket {
private final long createtime = System.currentTimeMillis();
private Map<String, Object> attributes = new HashMap<>(); //非线程安全
private Map<String, Object> attributes = new HashMap<>(); //非线程安全
protected final long websocketid = System.nanoTime(); //唯一ID
protected WebSocket() {
}
@@ -363,28 +365,6 @@ public abstract class WebSocket {
return rs;
}
/**
* 获取指定groupid在线用户的节点地址列表
*
* @param groupid groupid
*
* @return 地址列表
*/
protected final CompletableFuture<Collection<InetSocketAddress>> getOnlineNodes(Serializable groupid) {
return _engine.node.getOnlineNodes(groupid);
}
/**
* 获取指定groupid在线用户的详细连接信息
*
* @param groupid groupid
*
* @return 地址集合
*/
protected final CompletableFuture<Map<InetSocketAddress, List<String>>> getOnlineRemoteAddress(Serializable groupid) {
return _engine.node.getOnlineRemoteAddress(groupid);
}
/**
* 获取当前WebSocket下的属性非线程安全
*
@@ -407,7 +387,7 @@ public abstract class WebSocket {
* @return 属性值
*/
public final <T> T removeAttribute(String name) {
return attributes == null ? null : (T) attributes.remove(name);
return attributes == null ? null : (T) attributes.remove(name);
}
/**
@@ -417,7 +397,7 @@ public abstract class WebSocket {
* @param value 属性值
*/
public final void setAttribute(String name, Object value) {
if(attributes == null) attributes = new HashMap<>();
if (attributes == null) attributes = new HashMap<>();
attributes.put(name, value);
}

View File

@@ -38,6 +38,7 @@ public abstract class WebSocketNode {
protected WebSocketNode remoteNode;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid
//包含 localSncpAddress
//如果不是分布式(没有SNCP)sncpAddressNodes 将不会被用到
@Resource(name = "$")
protected CacheSource<Serializable, InetSocketAddress> sncpAddressNodes;
@@ -96,18 +97,24 @@ public abstract class WebSocketNode {
}
/**
* 获取在线用户的节点地址列表
* 获取用户在线的SNCP节点地址列表不是分布式则返回空列表<br>
* InetSocketAddress 为 SNCP节点地址
*
* @param groupid groupid
*
* @return 地址列表
*/
public CompletableFuture<Collection<InetSocketAddress>> getOnlineNodes(final Serializable groupid) {
return sncpAddressNodes == null ? CompletableFuture.completedFuture(null) : sncpAddressNodes.getCollectionAsync(groupid);
if (this.sncpAddressNodes != null) return this.sncpAddressNodes.getCollectionAsync(groupid);
List<InetSocketAddress> rs = new ArrayList<>();
if (this.localSncpAddress != null) rs.add(this.localSncpAddress);
return CompletableFuture.completedFuture(rs);
}
/**
* 获取在线用户的详细连接信息
* 获取在线用户的详细连接信息 <br>
* Map.key 为 SNCP节点地址
* Map.value 为 用户客户端的IP
*
* @param groupid groupid
*
@@ -116,7 +123,7 @@ public abstract class WebSocketNode {
//异步待优化
public CompletableFuture<Map<InetSocketAddress, List<String>>> getOnlineRemoteAddress(final Serializable groupid) {
final CompletableFuture<Map<InetSocketAddress, List<String>>> rs = new CompletableFuture<>();
CompletableFuture< Collection<InetSocketAddress>> nodesFuture = getOnlineNodes(groupid);
CompletableFuture<Collection<InetSocketAddress>> nodesFuture = getOnlineNodes(groupid);
if (nodesFuture == null) return CompletableFuture.completedFuture(null);
nodesFuture.whenComplete((nodes, e) -> {
if (e != null) {

View File

@@ -55,7 +55,6 @@ public class WebSocketRunner implements Runnable {
this.webSocket = webSocket;
this.channel = channel;
this.wsbinary = wsbinary;
webSocket._runner = this;
this.coder.logger = context.getLogger();
this.coder.debugable = false;//context.getLogger().isLoggable(Level.FINEST);
this.readBuffer = context.pollBuffer();

View File

@@ -144,7 +144,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}
webSocket._groupid = groupid;
WebSocketServlet.this.node.localEngine.add(webSocket);
context.runAsync(new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary));
WebSocketRunner runner = new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary);
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
}