WebSocket增加forceCloseWebSocket系列方法
This commit is contained in:
@@ -37,6 +37,9 @@ import org.redkale.util.Comment;
|
|||||||
*/
|
*/
|
||||||
public abstract class WebSocket<G extends Serializable, T> {
|
public abstract class WebSocket<G extends Serializable, T> {
|
||||||
|
|
||||||
|
@Comment("强制关闭结果码")
|
||||||
|
public static final int CLOSECODE_FORCED = 1;
|
||||||
|
|
||||||
@Comment("消息不合法")
|
@Comment("消息不合法")
|
||||||
public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2
|
public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2
|
||||||
|
|
||||||
@@ -367,6 +370,18 @@ public abstract class WebSocket<G extends Serializable, T> {
|
|||||||
return _engine.node.getRpcNodeWebSocketAddresses(userid);
|
return _engine.node.getRpcNodeWebSocketAddresses(userid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 强制关闭用户的所有WebSocket
|
||||||
|
*
|
||||||
|
* @param userid Serializable
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
@Comment("强制关闭用户的所有WebSocket")
|
||||||
|
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid) {
|
||||||
|
return _engine.node.forceCloseWebSocket(userid);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取当前WebSocket下的属性,非线程安全
|
* 获取当前WebSocket下的属性,非线程安全
|
||||||
*
|
*
|
||||||
@@ -612,7 +627,16 @@ public abstract class WebSocket<G extends Serializable, T> {
|
|||||||
* 显式地关闭WebSocket
|
* 显式地关闭WebSocket
|
||||||
*/
|
*/
|
||||||
public final void close() {
|
public final void close() {
|
||||||
if (this._runner != null) this._runner.closeRunner();
|
if (this._runner != null) this._runner.closeRunner(CLOSECODE_FORCED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否关闭
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
public final boolean isClosed() {
|
||||||
|
return this._runner != null ? this._runner.closed : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -132,6 +132,23 @@ public class WebSocketEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Comment("强制关闭本地用户的WebSocket")
|
||||||
|
public int forceCloseLocalWebSocket(Serializable userid) {
|
||||||
|
if (single) {
|
||||||
|
WebSocket ws = websockets.get(userid);
|
||||||
|
if (ws == null) return 0;
|
||||||
|
ws.close();
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
List<WebSocket> list = websockets2.get(userid);
|
||||||
|
if (list == null || list.isEmpty()) return 0;
|
||||||
|
List<WebSocket> list2 = new ArrayList<>(list);
|
||||||
|
for (WebSocket ws : list2) {
|
||||||
|
ws.close();
|
||||||
|
}
|
||||||
|
return list2.size();
|
||||||
|
}
|
||||||
|
|
||||||
@Comment("给所有连接用户发送消息")
|
@Comment("给所有连接用户发送消息")
|
||||||
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
|
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
|
||||||
return broadcastMessage(null, message, last);
|
return broadcastMessage(null, message, last);
|
||||||
|
|||||||
@@ -79,6 +79,8 @@ public abstract class WebSocketNode {
|
|||||||
|
|
||||||
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress addr);
|
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress addr);
|
||||||
|
|
||||||
|
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, InetSocketAddress addr);
|
||||||
|
|
||||||
//--------------------------------------------------------------------------------
|
//--------------------------------------------------------------------------------
|
||||||
final CompletableFuture<Void> connect(final Serializable userid) {
|
final CompletableFuture<Void> connect(final Serializable userid) {
|
||||||
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + userid + " on " + this.localEngine.getEngineid() + ").");
|
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + userid + " on " + this.localEngine.getEngineid() + ").");
|
||||||
@@ -178,6 +180,37 @@ public abstract class WebSocketNode {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 强制关闭用户WebSocket
|
||||||
|
*
|
||||||
|
* @param userid Serializable
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
public final CompletableFuture<Integer> forceCloseWebSocket(final Serializable userid) {
|
||||||
|
CompletableFuture<Integer> localFuture = null;
|
||||||
|
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid));
|
||||||
|
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
|
||||||
|
if (finest) logger.finest("websocket remote node is null");
|
||||||
|
//没有CacheSource就不会有分布式节点
|
||||||
|
return localFuture;
|
||||||
|
}
|
||||||
|
//远程节点关闭
|
||||||
|
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(userid);
|
||||||
|
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||||
|
if (finest) logger.finest("websocket found userid:" + userid + " on " + addrs);
|
||||||
|
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
|
||||||
|
CompletableFuture<Integer> future = null;
|
||||||
|
for (InetSocketAddress addr : addrs) {
|
||||||
|
if (addr == null || addr.equals(localSncpAddress)) continue;
|
||||||
|
future = future == null ? remoteNode.forceCloseWebSocket(userid, addr)
|
||||||
|
: future.thenCombine(remoteNode.forceCloseWebSocket(userid, addr), (a, b) -> a + b);
|
||||||
|
}
|
||||||
|
return future == null ? CompletableFuture.completedFuture(0) : future;
|
||||||
|
});
|
||||||
|
return localFuture.thenCombine(remoteFuture, (a, b) -> a + b);
|
||||||
|
}
|
||||||
|
|
||||||
//--------------------------------------------------------------------------------
|
//--------------------------------------------------------------------------------
|
||||||
/**
|
/**
|
||||||
* 获取本地的WebSocketEngine,没有则返回null
|
* 获取本地的WebSocketEngine,没有则返回null
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ class WebSocketRunner implements Runnable {
|
|||||||
|
|
||||||
private ByteBuffer readBuffer;
|
private ByteBuffer readBuffer;
|
||||||
|
|
||||||
protected volatile boolean closed = false;
|
volatile boolean closed = false;
|
||||||
|
|
||||||
private AtomicBoolean writing = new AtomicBoolean();
|
private AtomicBoolean writing = new AtomicBoolean();
|
||||||
|
|
||||||
@@ -75,7 +75,7 @@ class WebSocketRunner implements Runnable {
|
|||||||
@Override
|
@Override
|
||||||
public void completed(Integer count, Void attachment1) {
|
public void completed(Integer count, Void attachment1) {
|
||||||
if (count < 1 && readBuffers.isEmpty()) {
|
if (count < 1 && readBuffers.isEmpty()) {
|
||||||
closeRunner();
|
closeRunner(0);
|
||||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -225,7 +225,7 @@ class WebSocketRunner implements Runnable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
closeRunner();
|
closeRunner(0);
|
||||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
|
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
|
||||||
} finally {
|
} finally {
|
||||||
if (exBuffers != null) {
|
if (exBuffers != null) {
|
||||||
@@ -238,18 +238,18 @@ class WebSocketRunner implements Runnable {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable exc, Void attachment2) {
|
public void failed(Throwable exc, Void attachment2) {
|
||||||
closeRunner();
|
closeRunner(0);
|
||||||
if (exc != null) {
|
if (exc != null) {
|
||||||
context.getLogger().log(Level.FINEST, "WebSocketRunner read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
|
context.getLogger().log(Level.FINEST, "WebSocketRunner read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
closeRunner();
|
closeRunner(0);
|
||||||
context.getLogger().log(Level.FINEST, "WebSocketRunner abort by AsyncConnection closed");
|
context.getLogger().log(Level.FINEST, "WebSocketRunner abort by AsyncConnection closed");
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
closeRunner();
|
closeRunner(0);
|
||||||
context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read bytes from channel, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
|
context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read bytes from channel, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -315,7 +315,7 @@ class WebSocketRunner implements Runnable {
|
|||||||
channel.write(buffers, buffers, this);
|
channel.write(buffers, buffers, this);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
closeRunner();
|
closeRunner(0);
|
||||||
context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
|
context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
|
||||||
}
|
}
|
||||||
writing.set(false);
|
writing.set(false);
|
||||||
@@ -324,7 +324,7 @@ class WebSocketRunner implements Runnable {
|
|||||||
@Override
|
@Override
|
||||||
public void failed(Throwable exc, ByteBuffer[] attachments) {
|
public void failed(Throwable exc, ByteBuffer[] attachments) {
|
||||||
writing.set(false);
|
writing.set(false);
|
||||||
closeRunner();
|
closeRunner(0);
|
||||||
if (exc != null) {
|
if (exc != null) {
|
||||||
context.getLogger().log(Level.FINE, "WebSocket sendMessage on CompletionHandler failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
|
context.getLogger().log(Level.FINE, "WebSocket sendMessage on CompletionHandler failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
|
||||||
}
|
}
|
||||||
@@ -332,14 +332,14 @@ class WebSocketRunner implements Runnable {
|
|||||||
});
|
});
|
||||||
} catch (Exception t) {
|
} catch (Exception t) {
|
||||||
writing.set(false);
|
writing.set(false);
|
||||||
closeRunner();
|
closeRunner(0);
|
||||||
context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
|
context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
|
||||||
futureResult.complete(RETCODE_SENDEXCEPTION);
|
futureResult.complete(RETCODE_SENDEXCEPTION);
|
||||||
}
|
}
|
||||||
return futureResult;
|
return futureResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void closeRunner() {
|
public void closeRunner(int code) {
|
||||||
if (closed) return;
|
if (closed) return;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (closed) return;
|
if (closed) return;
|
||||||
@@ -351,7 +351,7 @@ class WebSocketRunner implements Runnable {
|
|||||||
context.offerBuffer(readBuffer);
|
context.offerBuffer(readBuffer);
|
||||||
readBuffer = null;
|
readBuffer = null;
|
||||||
engine.remove(webSocket);
|
engine.remove(webSocket);
|
||||||
webSocket.onClose(0, null);
|
webSocket.onClose(code, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -87,4 +87,20 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
|
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 强制关闭用户的WebSocket
|
||||||
|
*
|
||||||
|
* @param userid String
|
||||||
|
* @param sncpAddr InetSocketAddress
|
||||||
|
*
|
||||||
|
* @return 无返回值
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, InetSocketAddress sncpAddr) {
|
||||||
|
//不能从sncpNodeAddresses中移除,因为engine.forceCloseWebSocket 会调用到disconnect
|
||||||
|
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + sncpAddr);
|
||||||
|
if (localEngine == null) return CompletableFuture.completedFuture(0);
|
||||||
|
return CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user