修复进程关闭时WebSocket没有执行onClose方法的BUG

This commit is contained in:
Redkale
2018-07-24 20:11:02 +08:00
parent 69a0071e17
commit e88c4fa2e3
4 changed files with 15 additions and 10 deletions

View File

@@ -802,7 +802,10 @@ public abstract class WebSocket<G extends Serializable, T> {
* 显式地关闭WebSocket * 显式地关闭WebSocket
*/ */
public final void close() { public final void close() {
if (this._runner != null) this._runner.closeRunner(CLOSECODE_FORCED, "user close"); if (this._runner != null) {
CompletableFuture<Void> future = this._runner.closeRunner(CLOSECODE_FORCED, "user close");
if (future != null) future.join();
}
} }
/** /**

View File

@@ -144,12 +144,12 @@ public class WebSocketEngine {
} }
@Comment("从WebSocketEngine删除指定WebSocket") @Comment("从WebSocketEngine删除指定WebSocket")
void removeThenClose(WebSocket socket) { CompletableFuture<Void> removeThenClose(WebSocket socket) {
Serializable userid = socket._userid; Serializable userid = socket._userid;
if (single) { if (single) {
currconns.decrementAndGet(); currconns.decrementAndGet();
websockets.remove(userid); websockets.remove(userid);
if (node != null) node.disconnect(userid); if (node != null) return node.disconnect(userid);
} else { //非线程安全, 在常规场景中无需锁 } else { //非线程安全, 在常规场景中无需锁
List<WebSocket> list = websockets2.get(userid); List<WebSocket> list = websockets2.get(userid);
if (list != null) { if (list != null) {
@@ -157,10 +157,11 @@ public class WebSocketEngine {
list.remove(socket); list.remove(socket);
if (list.isEmpty()) { if (list.isEmpty()) {
websockets2.remove(userid); websockets2.remove(userid);
if (node != null) node.disconnect(userid); if (node != null) return node.disconnect(userid);
} }
} }
} }
return null;
} }
@Comment("更改WebSocket的userid") @Comment("更改WebSocket的userid")

View File

@@ -80,7 +80,7 @@ public abstract class WebSocketNode {
public final void postDestroy(AnyValue conf) { public final void postDestroy(AnyValue conf) {
if (this.localEngine == null) return; if (this.localEngine == null) return;
//关掉所有本地本地WebSocket //关掉所有本地本地WebSocket
this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()).join()); this.localEngine.getLocalWebSockets().forEach(g -> g.close());
if (sncpNodeAddresses != null && localSncpAddress != null) { if (sncpNodeAddresses != null && localSncpAddress != null) {
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, localSncpAddress); sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, localSncpAddress);
} }

View File

@@ -74,7 +74,7 @@ class WebSocketRunner implements Runnable {
@Override @Override
public void completed(Integer count, Void attachment1) { public void completed(Integer count, Void attachment1) {
if (count < 1) { if (count < 1) {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid="+webSocket.getUserid()+") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid=" + webSocket.getUserid() + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
closeRunner(0, "read buffer count is " + count); closeRunner(0, "read buffer count is " + count);
return; return;
} }
@@ -296,16 +296,17 @@ class WebSocketRunner implements Runnable {
return closed; return closed;
} }
public void closeRunner(int code, String reason) { public CompletableFuture<Void> closeRunner(int code, String reason) {
if (closed) return; if (closed) return null;
synchronized (this) { synchronized (this) {
if (closed) return; if (closed) return null;
closed = true; closed = true;
channel.dispose(); channel.dispose();
context.offerBuffer(readBuffer); context.offerBuffer(readBuffer);
readBuffer = null; readBuffer = null;
engine.removeThenClose(webSocket); CompletableFuture<Void> future = engine.removeThenClose(webSocket);
webSocket.onClose(code, reason); webSocket.onClose(code, reason);
return future;
} }
} }