diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index 4e1a205a2..08c466c88 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -567,21 +567,23 @@ public class HttpServer extends Server { - if (asyncGroup == null) { - groupLock.lock(); - try { - if (asyncGroup == null) { - WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, safeBufferPool); - g.start(); - asyncGroup = g; + if (false) { //暂不使用WebSocketAsyncGroup模式 + rs.webSocketWriterIOThreadFunc = ws -> { + if (asyncGroup == null) { + groupLock.lock(); + try { + if (asyncGroup == null) { + WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, safeBufferPool); + g.start(); + asyncGroup = g; + } + } finally { + groupLock.unlock(); } - } finally { - groupLock.unlock(); } - } - return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread(); - }; + return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread(); + }; + } return rs; } diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index 27a7e9092..3ef1b71b1 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -86,7 +86,8 @@ public abstract class WebSocket { WebSocketReadHandler _readHandler; - //WebSocketWriteHandler _writeHandler; + WebSocketWriteHandler _writeHandler; + WebSocketWriteIOThread _writeIOThread; InetSocketAddress _sncpAddress; //分布式下不可为空 @@ -239,14 +240,14 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ CompletableFuture sendPacket(WebSocketPacket packet) { - if (this._writeIOThread == null) { + if (this._writeHandler == null) { //if (this._writeIOThread == null) { if (delayPackets == null) { delayPackets = new ArrayList<>(); } delayPackets.add(packet); return CompletableFuture.completedFuture(RETCODE_DEAYSEND); } - CompletableFuture rs = this._writeIOThread.send(this, packet); + CompletableFuture rs = this._writeHandler.send(packet); //this._writeIOThread.send(this, packet); if (_engine.logger.isLoggable(Level.FINER) && packet != WebSocketPacket.DEFAULT_PING_PACKET) { _engine.logger.finer("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this); } diff --git a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java index d61952b71..beef66204 100644 --- a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java +++ b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java @@ -20,6 +20,7 @@ import org.redkale.util.ByteBufferPool; * * @since 2.8.0 */ +@Deprecated(since = "2.8.0") class WebSocketAsyncGroup extends AsyncIOGroup { public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index 86eff98d6..ecbe55fd9 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -285,7 +285,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void completed(Integer result, Void attachment) { webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, restMessageConsumer); - //webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket); + webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket); response.getContext().updateWebSocketWriteIOThread(webSocket); Runnable createUseridHandler = () -> { @@ -349,7 +349,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (webSocket.delayPackets != null) { //存在待发送的消息 List delayPackets = webSocket.delayPackets; webSocket.delayPackets = null; - CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); + //CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); + CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { if (userid == null || t != null) { if (t != null) { @@ -368,7 +369,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (webSocket.delayPackets != null) { //存在待发送的消息 List delayPackets = webSocket.delayPackets; webSocket.delayPackets = null; - CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); + //CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); + CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { if (sessionid == null || t != null) { if (t != null) { diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java index e08edfa8c..58586083a 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java @@ -17,7 +17,6 @@ import org.redkale.util.ByteArray; * * @author zhangjx */ -@Deprecated(since = "2.8.0") public class WebSocketWriteHandler implements CompletionHandler { protected final HttpContext context; diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java index d1769a394..76077061c 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java @@ -21,6 +21,7 @@ import org.redkale.util.*; * * @since 2.8.0 */ +@Deprecated(since = "2.8.0") public class WebSocketWriteIOThread extends AsyncIOThread { private final ScheduledThreadPoolExecutor timeoutExecutor;