From e6150b3469d8ac3b3374616e56d802beeff4b4bc Mon Sep 17 00:00:00 2001 From: Redkale Date: Sat, 14 Jan 2023 14:44:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96WebSocket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/Response.java | 32 ++-- src/main/java/org/redkale/net/WorkThread.java | 24 +++ .../org/redkale/net/http/HttpContext.java | 2 +- .../org/redkale/net/http/HttpResponse.java | 2 +- .../java/org/redkale/net/http/WebSocket.java | 4 +- .../org/redkale/net/http/WebSocketEngine.java | 32 ++-- .../net/http/WebSocketReadHandler.java | 83 ++++++----- .../redkale/net/http/WebSocketServlet.java | 137 +++++++++++------- .../org/redkale/util/ThreadHashExecutor.java | 4 + .../test/websocket/ChatWebSocketServlet.java | 7 +- 10 files changed, 202 insertions(+), 125 deletions(-) diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index 9eb836ff3..f6e4e33e6 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -47,7 +47,7 @@ public abstract class Response> { protected Servlet> servlet; - protected final ByteBuffer writeBuffer; + private final ByteBuffer writeBuffer; private final CompletionHandler finishBytesHandler = new CompletionHandler() { @@ -377,23 +377,29 @@ public abstract class Response> { } protected void send(final ByteTuple array, final CompletionHandler handler) { - this.channel.write(array, new CompletionHandler() { + ByteBuffer buffer = this.writeBuffer; + if (buffer != null && buffer.capacity() >= array.length()) { + buffer.clear(); + buffer.put(array.content(), array.offset(), array.length()); + buffer.flip(); + this.channel.write(buffer, buffer, new CompletionHandler() { - @Override - public void completed(Integer result, Void attachment) { - if (handler != null) { - handler.completed(result, attachment); + @Override + public void completed(Integer result, ByteBuffer attachment) { + attachment.clear(); + handler.completed(result, null); } - } - @Override - public void failed(Throwable exc, Void attachment) { - if (handler != null) { - handler.failed(exc, attachment); + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + attachment.clear(); + handler.failed(exc, null); } - } - }); + }); + } else { + this.channel.write(array, handler); + } } protected void send(final ByteBuffer buffer, final A attachment, final CompletionHandler handler) { diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index f5c2d90a6..ba2f33435 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -129,6 +129,30 @@ public class WorkThread extends Thread implements Executor { } } + public void runWork(int hash, Runnable command) { + if (hashExecutor == null) { + if (workExecutor == null) { + command.run(); + } else { + workExecutor.execute(command); + } + } else { + hashExecutor.execute(hash, command); + } + } + + public void runWork(java.io.Serializable hash, Runnable command) { + if (hashExecutor == null) { + if (workExecutor == null) { + command.run(); + } else { + workExecutor.execute(command); + } + } else { + hashExecutor.execute(hash, command); + } + } + public void runAsync(Runnable command) { if (workExecutor == null) { ForkJoinPool.commonPool().execute(command); diff --git a/src/main/java/org/redkale/net/http/HttpContext.java b/src/main/java/org/redkale/net/http/HttpContext.java index 0fe52fac4..de962b97a 100644 --- a/src/main/java/org/redkale/net/http/HttpContext.java +++ b/src/main/java/org/redkale/net/http/HttpContext.java @@ -9,8 +9,8 @@ import java.nio.channels.CompletionHandler; import java.security.SecureRandom; import java.util.concurrent.ConcurrentHashMap; import org.redkale.annotation.ConstructorParameters; -import static org.redkale.asm.Opcodes.*; import org.redkale.asm.*; +import static org.redkale.asm.Opcodes.*; import org.redkale.net.Context; import org.redkale.util.*; diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 1a9e49847..80bd6ae1c 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -1197,7 +1197,7 @@ public class HttpResponse extends Response { this.contentLength = buffer == null ? 0 : buffer.remaining(); } createHeader(); - if (buffer == null) { + if (buffer == null) { //只发header super.send(headerArray, handler); } else { ByteBuffer headbuf = channel.pollWriteBuffer(); diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index 632041902..86c8c07ba 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -827,11 +827,11 @@ public abstract class WebSocket { * 接收到消息前的拦截方法, ping/pong不在其内
* 注意:处理完后需要调用 messageEvent.run() 才能响应onMessage * - * @param restmapping Rest的方法名,没有则为空字符串 + * @param restMapping Rest的方法名,没有则为空字符串 * @param param onMessage方法的参数 * @param messageEvent onMessage事件 */ - public void preOnMessage(String restmapping, WebSocketParam param, Runnable messageEvent) { + public void preOnMessage(String restMapping, WebSocketParam param, Runnable messageEvent) { messageEvent.run(); } diff --git a/src/main/java/org/redkale/net/http/WebSocketEngine.java b/src/main/java/org/redkale/net/http/WebSocketEngine.java index 02aeb9b59..c0aa2c4e7 100644 --- a/src/main/java/org/redkale/net/http/WebSocketEngine.java +++ b/src/main/java/org/redkale/net/http/WebSocketEngine.java @@ -56,7 +56,7 @@ public class WebSocketEngine { private final Map> websockets2 = new ConcurrentHashMap<>(); @Comment("当前连接数") - protected final AtomicInteger currconns = new AtomicInteger(); + protected final AtomicInteger currConns = new AtomicInteger(); @Comment("用于PING的定时器") private ScheduledThreadPoolExecutor scheduler; @@ -65,7 +65,7 @@ public class WebSocketEngine { protected final Logger logger; @Comment("PING的间隔秒数") - protected int liveinterval; + protected int liveInterval; @Comment("最大连接数, 为0表示无限制") protected int wsMaxConns; @@ -82,14 +82,14 @@ public class WebSocketEngine { @Comment("加密解密器") protected Cryptor cryptor; - protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int wsMaxConns, + protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveInterval, int wsMaxConns, int wsThreads, int wsMaxBody, boolean mergeMode, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) { this.engineid = engineid; this.single = single; this.context = context; this.sendConvert = sendConvert; this.node = node; - this.liveinterval = liveinterval; + this.liveInterval = liveInterval; this.wsMaxConns = wsMaxConns; this.wsThreads = wsThreads; this.wsMaxBody = wsMaxBody; @@ -104,8 +104,8 @@ public class WebSocketEngine { if (conf != null && conf.getAnyValue("properties") != null) { props = conf.getAnyValue("properties"); } - this.liveinterval = props == null ? (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval)); - if (liveinterval <= 0) { + this.liveInterval = props == null ? (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval)); + if (liveInterval <= 0) { return; } if (props != null) { @@ -125,18 +125,18 @@ public class WebSocketEngine { t.setDaemon(true); return t; }); - long delay = (liveinterval - System.currentTimeMillis() / 1000 % liveinterval) + index * 5; - final int intervalms = liveinterval * 1000; + long delay = (liveInterval - System.currentTimeMillis() / 1000 % liveInterval) + index * 5; + final int intervalms = liveInterval * 1000; scheduler.scheduleWithFixedDelay(() -> { try { long now = System.currentTimeMillis(); getLocalWebSockets().stream().filter(x -> ((now - x.getLastReadTime()) > intervalms && (now - x.getLastSendTime()) > intervalms)).forEach(x -> x.sendPing()); } catch (Throwable t) { - logger.log(Level.SEVERE, "WebSocketEngine schedule(interval=" + liveinterval + "s) ping error", t); + logger.log(Level.SEVERE, "WebSocketEngine schedule(interval=" + liveInterval + "s) ping error", t); } - }, delay, liveinterval, TimeUnit.SECONDS); + }, delay, liveInterval, TimeUnit.SECONDS); if (logger.isLoggable(Level.FINEST)) { - logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveinterval + "s) scheduler executor"); + logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveInterval + "s) scheduler executor"); } } @@ -149,7 +149,7 @@ public class WebSocketEngine { @Comment("添加WebSocket") CompletableFuture addLocal(WebSocket socket) { if (single) { - currconns.incrementAndGet(); + currConns.incrementAndGet(); websockets.put(socket._userid, socket); } else { //非线程安全, 在常规场景中无需锁 List list = websockets2.get(socket._userid); @@ -157,7 +157,7 @@ public class WebSocketEngine { list = new CopyOnWriteArrayList<>(); websockets2.put(socket._userid, list); } - currconns.incrementAndGet(); + currConns.incrementAndGet(); list.add(socket); } if (node != null) { @@ -173,7 +173,7 @@ public class WebSocketEngine { return null; //尚未登录成功 } if (single) { - currconns.decrementAndGet(); + currConns.decrementAndGet(); websockets.remove(userid); if (node != null) { return node.disconnect(userid); @@ -181,7 +181,7 @@ public class WebSocketEngine { } else { //非线程安全, 在常规场景中无需锁 List list = websockets2.get(userid); if (list != null) { - currconns.decrementAndGet(); + currConns.decrementAndGet(); list.remove(socket); if (list.isEmpty()) { websockets2.remove(userid); @@ -476,7 +476,7 @@ public class WebSocketEngine { if (this.wsMaxConns < 1) { return false; } - return currconns.get() >= this.wsMaxConns; + return currConns.get() >= this.wsMaxConns; } @Comment("获取所有连接") diff --git a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java index 18034e7ef..283cf2400 100644 --- a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java @@ -13,6 +13,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.logging.*; import org.redkale.convert.Convert; +import org.redkale.net.AsyncIOThread; import static org.redkale.net.http.WebSocket.*; import org.redkale.net.http.WebSocketPacket.FrameType; import org.redkale.util.ByteArray; @@ -31,8 +32,6 @@ public class WebSocketReadHandler implements CompletionHandler currPackets = new ArrayList<>(); protected ByteArray currSeriesMergeMessageBytes; @@ -51,21 +50,23 @@ public class WebSocketReadHandler implements CompletionHandler messageConsumer) { this.context = context; this.restMessageConsumer = messageConsumer; this.webSocket = webSocket; + this.ioReadThread = webSocket._channel.getReadIOThread(); this.logger = context.getLogger(); - this.debug = context.getLogger().isLoggable(Level.FINEST); } public void startRead() { CompletableFuture connectFuture = webSocket.onConnected(); if (connectFuture == null) { - webSocket._channel.read(this); + webSocket._channel.readInIOThread(this); } else { connectFuture.whenComplete((r, t) -> { - webSocket._channel.read(this); + webSocket._channel.readInIOThread(this); }); } } @@ -96,6 +97,7 @@ public class WebSocketReadHandler implements CompletionHandler 6) { logger.log(Level.FINEST, "read websocket message's length = " + realbuf.remaining()); } @@ -290,6 +292,7 @@ public class WebSocketReadHandler implements CompletionHandler { + try { + Convert convert = webSocket.getTextConvert(); + if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用 + restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload())); + } else { + webSocket.onMessage(packet.getPayload() == null ? null : new String(packet.getPayload(), StandardCharsets.UTF_8), packet.last); + } + } catch (Throwable e) { + logger.log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); } - } catch (Throwable e) { - logger.log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); - } + }); } else if (packet.type == FrameType.BINARY) { - try { - Convert convert = webSocket.getBinaryConvert(); - if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用 - restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload())); - } else { - webSocket.onMessage(packet.getPayload(), packet.last); + ioReadThread.runWork(webSocket._userid, () -> { + try { + Convert convert = webSocket.getBinaryConvert(); + if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用 + restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload())); + } else { + webSocket.onMessage(packet.getPayload(), packet.last); + } + } catch (Throwable e) { + logger.log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); } - } catch (Throwable e) { - logger.log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); - } + }); } else if (packet.type == FrameType.PING) { - try { - webSocket.onPing(packet.getPayload()); - } catch (Exception e) { - logger.log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e); - } + ioReadThread.runWork(webSocket._userid, () -> { + try { + webSocket.onPing(packet.getPayload()); + } catch (Exception e) { + logger.log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e); + } + }); } else if (packet.type == FrameType.PONG) { - try { - //if (debug) logger.log(Level.FINEST, "WebSocket onMessage by PONG FrameType : " + packet); - webSocket.onPong(packet.getPayload()); - } catch (Exception e) { - logger.log(Level.SEVERE, "WebSocket(" + webSocket + ") onPong error (" + packet + ")", e); - } + ioReadThread.runWork(webSocket._userid, () -> { + try { + //if (debug) logger.log(Level.FINEST, "WebSocket onMessage by PONG FrameType : " + packet); + webSocket.onPong(packet.getPayload()); + } catch (Exception e) { + logger.log(Level.SEVERE, "WebSocket(" + webSocket + ") onPong error (" + packet + ")", e); + } + }); } else if (packet.type == FrameType.CLOSE) { webSocket.initiateClosed = true; if (debug) { @@ -360,7 +371,7 @@ public class WebSocketReadHandler implements CompletionHandler 0) continue; + if (!method.getName().equals("createWebSocket")) { + continue; + } + if (method.getParameterCount() > 0) { + continue; + } Type rt = TypeToken.getGenericType(method.getGenericReturnType(), this.getClass()); if (rt instanceof ParameterizedType) { msgtype = ((ParameterizedType) rt).getActualTypeArguments()[1]; } - if (msgtype == Object.class) msgtype = String.class; + if (msgtype == Object.class) { + msgtype = String.class; + } break; } } catch (Exception e) { @@ -144,19 +150,33 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override final void preInit(Application application, HttpContext context, AnyValue conf) { - if (this.textConvert == null) this.textConvert = jsonConvert; - if (this.sendConvert == null) this.sendConvert = jsonConvert; - InetSocketAddress addr = context.getServerAddress(); - if (this.node == null) this.node = createWebSocketNode(); - if (this.node == null) { //没有部署SNCP,即不是分布式 - this.node = new WebSocketNodeService(); - if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); + if (this.textConvert == null) { + this.textConvert = jsonConvert; + } + if (this.sendConvert == null) { + this.sendConvert = jsonConvert; + } + InetSocketAddress addr = context.getServerAddress(); + if (this.webSocketNode == null) { + this.webSocketNode = createWebSocketNode(); + } + if (this.webSocketNode == null) { //没有部署SNCP,即不是分布式 + this.webSocketNode = new WebSocketNodeService(); + if (logger.isLoggable(Level.WARNING)) { + logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); + } + } + if (this.webSocketNode.sendConvert == null) { + this.webSocketNode.sendConvert = this.sendConvert; + } + if (this.messageAgent != null) { + this.webSocketNode.messageAgent = this.messageAgent; } - if (this.node.sendConvert == null) this.node.sendConvert = this.sendConvert; - if (this.messageAgent != null) this.node.messageAgent = this.messageAgent; { AnyValue props = conf; - if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties"); + if (conf != null && conf.getAnyValue("properties") != null) { + props = conf.getAnyValue("properties"); + } if (props != null) { String cryptorClass = props.getValue(WEBPARAM__CRYPTOR); if (cryptorClass != null && !cryptorClass.isEmpty()) { @@ -164,28 +184,34 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl Class clazz = Thread.currentThread().getContextClassLoader().loadClass(cryptorClass); this.cryptor = (Cryptor) clazz.getDeclaredConstructor().newInstance(); RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, cryptorClass); - if (resourceFactory != null && this.cryptor != null) resourceFactory.inject(this.cryptor); + if (resourceFactory != null && this.cryptor != null) { + resourceFactory.inject(this.cryptor); + } } catch (Exception e) { throw new HttpException(e); } } } } - if (application != null && application.isCompileMode()) return; + if (application != null && application.isCompileMode()) { + return; + } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service - this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", - this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.node, this.sendConvert, logger); - this.node.init(conf); - this.node.localEngine.init(conf); + this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", + this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger); + this.webSocketNode.init(conf); + this.webSocketNode.localEngine.init(conf); } @Override final void postDestroy(Application application, HttpContext context, AnyValue conf) { - this.node.postDestroy(conf); + this.webSocketNode.postDestroy(conf); super.destroy(context, conf); - if (application != null && application.isCompileMode()) return; - this.node.localEngine.destroy(conf); + if (application != null && application.isCompileMode()) { + return; + } + this.webSocketNode.localEngine.destroy(conf); } @Override @@ -193,27 +219,31 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return this.getClass().getSimpleName().replace("_Dyn", "").toLowerCase().replaceAll("websocket.*$", "").replaceAll("servlet.*$", ""); } - @Override + @Override //在IOThread中执行 public final void execute(final HttpRequest request, final HttpResponse response) throws IOException { final boolean debug = logger.isLoggable(Level.FINEST); if (!request.isWebSocket()) { - if (debug) logger.finest("WebSocket connect abort, (Not GET Method) or (Connection != Upgrade) or (Upgrade != websocket). request=" + request); + if (debug) { + logger.log(Level.FINEST, "WebSocket connect abort, (Not GET Method) or (Connection != Upgrade) or (Upgrade != websocket). request=" + request); + } response.finish(true); return; } final String key = request.getHeader("Sec-WebSocket-Key"); if (key == null) { - if (debug) logger.finest("WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request); + if (debug) { + logger.log(Level.FINEST, "WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request); + } response.finish(true); return; } - if (this.node.localEngine.isLocalConnLimited()) { - if (debug) logger.finest("WebSocket connections limit, wsmaxconns=" + this.node.localEngine.getLocalWsMaxConns()); + if (this.webSocketNode.localEngine.isLocalConnLimited()) { + logger.log(Level.WARNING, "WebSocket connections limit, wsmaxconns=" + this.webSocketNode.localEngine.getLocalWsMaxConns()); response.finish(true); return; } final WebSocket webSocket = this.createWebSocket(); - webSocket._engine = this.node.localEngine; + webSocket._engine = this.webSocketNode.localEngine; webSocket._channel = response.getChannel(); webSocket._messageRestType = this.messageRestType; webSocket._textConvert = textConvert; @@ -221,7 +251,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._sendConvert = sendConvert; webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddr = request.getRemoteAddr(); - webSocket._sncpAddress = this.node.localSncpAddress; + webSocket._sncpAddress = this.webSocketNode.localSncpAddress; if (this.permessageDeflate && request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) { webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); webSocket.inflater = new Inflater(true); @@ -229,13 +259,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl initRestWebSocket(webSocket); CompletableFuture sessionFuture = webSocket.onOpen(request); if (sessionFuture == null) { - if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request); + if (debug) { + logger.log(Level.FINEST, "WebSocket connect abort, Not found sessionid. request=" + request); + } response.finish(true); return; } BiConsumer sessionConsumer = (sessionid, ex) -> { if ((sessionid == null && webSocket.delayPackets == null) || ex != null) { - if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); + if (debug || ex != null) { + logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); + } response.finish(true); return; } @@ -250,7 +284,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl response.setHeader("Connection", "Upgrade"); response.addHeader("Upgrade", "websocket"); response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes)); - if (webSocket.deflater != null) response.addHeader("Sec-WebSocket-Extensions", "permessage-deflate"); + if (webSocket.deflater != null) { + response.addHeader("Sec-WebSocket-Extensions", "permessage-deflate"); + } response.sendBody((ByteBuffer) null, new CompletionHandler() { @@ -261,30 +297,31 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl Runnable createUseridHandler = () -> { CompletableFuture userFuture = webSocket.createUserid(); if (userFuture == null) { - if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); + if (debug) { + logger.log(Level.FINEST, "WebSocket connect abort, Create userid abort. request = " + request); + } response.finish(true); return; } userFuture.whenComplete((userid, ex2) -> { if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { - if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); + if (debug || ex2 != null) { + logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); + } response.finish(true); return; } Runnable runHandler = () -> { webSocket._userid = userid; if (single && !anyuser) { - WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, nex) -> { + webSocketNode.existsWebSocket(userid).whenComplete((rs, nex) -> { if (rs) { CompletableFuture rcFuture = webSocket.onSingleRepeatConnect(); Consumer task = (oldkilled) -> { if (oldkilled) { - WebSocketServlet.this.node.localEngine.addLocal(webSocket); + webSocketNode.localEngine.addLocal(webSocket); response.removeChannel(); webSocket._readHandler.startRead(); -// WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer); -// webSocket._runner = runner; -// runner.run(); //context.runAsync(runner); response.finish(true); } else { //关闭新连接 response.finish(true); @@ -302,22 +339,16 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl }); } } else { - WebSocketServlet.this.node.localEngine.addLocal(webSocket); + webSocketNode.localEngine.addLocal(webSocket); response.removeChannel(); webSocket._readHandler.startRead(); -// WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer); -// webSocket._runner = runner; -// runner.run(); //context.runAsync(runner); response.finish(true); } }); } else { - WebSocketServlet.this.node.localEngine.addLocal(webSocket); + webSocketNode.localEngine.addLocal(webSocket); response.removeChannel(); webSocket._readHandler.startRead(); -// WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer); -// webSocket._runner = runner; -// runner.run(); //context.runAsync(runner); response.finish(true); } }; @@ -327,7 +358,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { if (userid == null || t != null) { - if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); + if (t != null) { + logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); + } response.finish(true); } else { runHandler.run(); @@ -344,7 +377,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { if (sessionid == null || t != null) { - if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); + if (t != null) { + logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); + } response.finish(true); } else { createUseridHandler.run(); @@ -367,7 +402,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (workThread == null || workThread == Thread.currentThread()) { sessionConsumer.accept(sessionid, ex); } else { - workThread.execute(() -> sessionConsumer.accept(sessionid, ex)); + workThread.runWork(() -> sessionConsumer.accept(sessionid, ex)); } }); } diff --git a/src/main/java/org/redkale/util/ThreadHashExecutor.java b/src/main/java/org/redkale/util/ThreadHashExecutor.java index 809ef4e54..d6e4f3172 100644 --- a/src/main/java/org/redkale/util/ThreadHashExecutor.java +++ b/src/main/java/org/redkale/util/ThreadHashExecutor.java @@ -89,6 +89,10 @@ public class ThreadHashExecutor extends AbstractExecutorService { hashExecutor(hash).execute(command); } + public void execute(java.io.Serializable hash, Runnable command) { + hashExecutor(hash == null ? 0 : hash.hashCode()).execute(command); + } + @Override public Future submit(Runnable task) { return hashExecutor(0).submit(task); diff --git a/src/test/java/org/redkale/test/websocket/ChatWebSocketServlet.java b/src/test/java/org/redkale/test/websocket/ChatWebSocketServlet.java index e1402ed1d..4e5a01b65 100644 --- a/src/test/java/org/redkale/test/websocket/ChatWebSocketServlet.java +++ b/src/test/java/org/redkale/test/websocket/ChatWebSocketServlet.java @@ -5,14 +5,11 @@ */ package org.redkale.test.websocket; -import org.redkale.net.http.WebServlet; -import org.redkale.net.http.WebSocketServlet; -import org.redkale.net.http.WebSocket; import java.util.concurrent.CompletableFuture; import org.redkale.annotation.Resource; import org.redkale.net.http.*; import org.redkale.test.rest.*; -import org.redkale.util.*; +import org.redkale.util.AnyValue; /** * @@ -26,7 +23,7 @@ public class ChatWebSocketServlet extends WebSocketServlet { @Override public void init(HttpContext context, AnyValue conf) { - System.out.println("本实例的WebSocketNode: " + super.node); + System.out.println("本实例的WebSocketNode: " + super.webSocketNode); } @Override