This commit is contained in:
Redkale
2017-05-17 18:00:15 +08:00
parent bb87774ba9
commit 804b4dc07d
3 changed files with 19 additions and 21 deletions

View File

@@ -37,23 +37,23 @@ public abstract class WebSocketNode {
protected WebSocketNode remoteNode; protected WebSocketNode remoteNode;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid //存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid
//如果不是分布式(没有SNCP)则InetSocketAddress为当前WebSocketServlet的监听地址
@Resource(name = "$") @Resource(name = "$")
protected CacheSource<Serializable, InetSocketAddress> sncpAddressNodes; protected CacheSource<Serializable, InetSocketAddress> sncpAddressNodes;
//当前节点的本地WebSocketEngine //当前节点的本地WebSocketEngine
protected WebSocketEngine _localEngine; protected WebSocketEngine localEngine;
public void init(AnyValue conf) { public void init(AnyValue conf) {
} }
public void destroy(AnyValue conf) { public void destroy(AnyValue conf) {
} }
public final void postDestroy(AnyValue conf) { public final void postDestroy(AnyValue conf) {
if (this._localEngine == null) return; if (this.localEngine == null) return;
this._localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); //关掉所有本地本地WebSocket
this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid()));
} }
protected abstract CompletableFuture<List<String>> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); protected abstract CompletableFuture<List<String>> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid);
@@ -66,12 +66,12 @@ public abstract class WebSocketNode {
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
final CompletableFuture<Void> connect(final Serializable groupid) { final CompletableFuture<Void> connect(final Serializable groupid) {
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + this._localEngine.getEngineid() + ")."); if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + this.localEngine.getEngineid() + ").");
return connect(groupid, localSncpAddress); return connect(groupid, localSncpAddress);
} }
final CompletableFuture<Void> disconnect(Serializable groupid) { final CompletableFuture<Void> disconnect(Serializable groupid) {
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this._localEngine.getEngineid() + ")."); if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this.localEngine.getEngineid() + ").");
return disconnect(groupid, localSncpAddress); return disconnect(groupid, localSncpAddress);
} }
@@ -149,7 +149,7 @@ public abstract class WebSocketNode {
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine");
int rscode = RETCODE_GROUP_EMPTY; int rscode = RETCODE_GROUP_EMPTY;
WebSocketGroup group = this._localEngine == null ? null : this._localEngine.getWebSocketGroup(groupid); WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid);
if (group != null) rscode = group.send(recent, message, last); if (group != null) rscode = group.send(recent, message, last);
if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { //没有其他远程的WebSocket连接 if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { //没有其他远程的WebSocket连接
if (finest) { if (finest) {

View File

@@ -69,27 +69,25 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Resource(name = "$") @Resource(name = "$")
protected WebSocketNode node; protected WebSocketNode node;
protected WebSocketEngine engine;
@Override @Override
final void preInit(HttpContext context, AnyValue conf) { final void preInit(HttpContext context, AnyValue conf) {
InetSocketAddress addr = context.getServerAddress(); InetSocketAddress addr = context.getServerAddress();
this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger);
if (this.node == null) this.node = createWebSocketNode(); if (this.node == null) this.node = createWebSocketNode();
if (this.node == null) { //没有部署SNCP即不是分布式 if (this.node == null) { //没有部署SNCP即不是分布式
this.node = new WebSocketNodeService(); this.node = new WebSocketNodeService();
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
} }
this.node._localEngine = engine; //存在WebSocketServlet则此WebSocketNode必须是本地模式Service //存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger);
this.node.init(conf); this.node.init(conf);
this.engine.init(conf); this.node.localEngine.init(conf);
} }
@Override @Override
final void postDestroy(HttpContext context, AnyValue conf) { final void postDestroy(HttpContext context, AnyValue conf) {
this.node.postDestroy(conf); this.node.postDestroy(conf);
super.destroy(context, conf); super.destroy(context, conf);
this.engine.destroy(conf); this.node.localEngine.destroy(conf);
} }
@Override @Override
@@ -112,7 +110,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return; return;
} }
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
webSocket._engine = engine; webSocket._engine = this.node.localEngine;
webSocket._jsonConvert = jsonConvert; webSocket._jsonConvert = jsonConvert;
webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr(); webSocket._remoteAddr = request.getRemoteAddr();
@@ -145,7 +143,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return; return;
} }
webSocket._groupid = groupid; webSocket._groupid = groupid;
engine.add(webSocket); WebSocketServlet.this.node.localEngine.add(webSocket);
context.runAsync(new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary)); context.runAsync(new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary));
response.finish(true); response.finish(true);
} }

View File

@@ -37,10 +37,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override @Override
public CompletableFuture<List<String>> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { public CompletableFuture<List<String>> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid); if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid);
if (this._localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
final List<String> rs = new ArrayList<>(); final List<String> rs = new ArrayList<>();
final WebSocketGroup group = this._localEngine.getWebSocketGroup(groupid); final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid);
if (group != null) group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr())); if (group != null) group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr()));
return rs; return rs;
}); });
@@ -49,10 +49,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override @Override
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
if (this._localEngine == null) return RETCODE_GROUP_EMPTY; if (this.localEngine == null) return RETCODE_GROUP_EMPTY;
final WebSocketGroup group = this._localEngine.getWebSocketGroup(groupid); final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) { if (group == null || group.isEmpty()) {
if (finest) logger.finest("receive websocket message {engineid:'" + this._localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); if (finest) logger.finest("receive websocket message {engineid:'" + this.localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY; return RETCODE_GROUP_EMPTY;
} }
int code = group.send(recent, message, last); int code = group.send(recent, message, last);