diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index b3ff2ec88..ed4792a62 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -62,7 +62,7 @@ public class ClientWriteIOThread extends AsyncIOThread { public void run() { final ByteBuffer buffer = getBufferSupplier().get(); final int capacity = buffer.capacity(); - final ByteArray writeArray = new ByteArray(1024 * 32); + final ByteArray writeArray = new ByteArray(); final Map> map = new HashMap<>(); final ObjectPool listPool = ObjectPool.createUnsafePool(Utility.cpus() * 2, () -> new ArrayList(), null, t -> { t.clear(); @@ -73,34 +73,14 @@ public class ClientWriteIOThread extends AsyncIOThread { try { while ((entry = requestQueue.take()) != null) { map.clear(); - { - Serializable reqid = entry.request.getRequestid(); - if (reqid == null) { - entry.conn.responseQueue.offer(entry); - } else { - entry.conn.responseMap.put(reqid, entry); - } - } - if (entry.conn.pauseWriting.get()) { - if (entry.conn.pauseResuming.get()) { - try { - synchronized (entry.conn.pauseRequests) { - entry.conn.pauseRequests.wait(3_000); - } - } catch (InterruptedException ie) { - } - } - entry.conn.pauseRequests.add(entry); - } else { - map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry); - } - while ((entry = requestQueue.poll()) != null) { + if (!entry.isDone()) { Serializable reqid = entry.request.getRequestid(); if (reqid == null) { entry.conn.responseQueue.offer(entry); } else { entry.conn.responseMap.put(reqid, entry); } + if (entry.conn.pauseWriting.get()) { if (entry.conn.pauseResuming.get()) { try { @@ -115,6 +95,29 @@ public class ClientWriteIOThread extends AsyncIOThread { map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry); } } + while ((entry = requestQueue.poll()) != null) { + if (!entry.isDone()) { + Serializable reqid = entry.request.getRequestid(); + if (reqid == null) { + entry.conn.responseQueue.offer(entry); + } else { + entry.conn.responseMap.put(reqid, entry); + } + if (entry.conn.pauseWriting.get()) { + if (entry.conn.pauseResuming.get()) { + try { + synchronized (entry.conn.pauseRequests) { + entry.conn.pauseRequests.wait(3_000); + } + } catch (InterruptedException ie) { + } + } + entry.conn.pauseRequests.add(entry); + } else { + map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry); + } + } + } map.forEach((conn, list) -> { writeArray.clear(); int i = -1; diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index f7dfd40fc..1b4c3b9a2 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -43,6 +43,10 @@ public class HttpServer extends Server safeBufferPool; + //配置 APP_EXECUTOR资源为null + //RESNAME_APP_EXECUTOR + protected ExecutorService workExecutor; + public HttpServer() { this(null, System.currentTimeMillis(), ResourceFactory.create()); } @@ -53,6 +57,7 @@ public class HttpServer extends Server + * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +public class WebSocketFuture extends CompletableFuture implements Runnable { + + WebSocketPacket packet; + + WebSocket websocket; + + WorkThread workThread; + + ScheduledFuture timeout; + + WebSocketFuture(WorkThread workThread, WebSocket websocket, WebSocketPacket packet) { + super(); + Objects.requireNonNull(workThread); + this.workThread = workThread; + this.websocket = websocket; + this.packet = packet; + } + + void cancelTimeout() { + if (timeout != null) { + timeout.cancel(true); + } + } + + @Override //JDK9+ + public WebSocketFuture newIncompleteFuture() { + WebSocketFuture future = new WebSocketFuture(workThread, websocket, packet); + future.timeout = timeout; + return future; + } + + @Override + public void run() { + TimeoutException ex = new TimeoutException(); + workThread.runWork(() -> completeExceptionally(ex)); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "_" + Objects.hash(this) + "{websocket = " + websocket + ", packet = " + packet + "}"; + } +} diff --git a/src/main/java/org/redkale/net/http/WebSocketPacket.java b/src/main/java/org/redkale/net/http/WebSocketPacket.java index 3a2f6e6e7..54f3c6875 100644 --- a/src/main/java/org/redkale/net/http/WebSocketPacket.java +++ b/src/main/java/org/redkale/net/http/WebSocketPacket.java @@ -8,6 +8,7 @@ package org.redkale.net.http; import java.io.Serializable; import java.nio.charset.StandardCharsets; import org.redkale.net.http.WebSocketPacket.FrameType; +import org.redkale.util.ByteArray; /** * @@ -20,8 +21,6 @@ public final class WebSocketPacket { public static final Object MESSAGE_NIL = new Object(); - static final WebSocketPacket NONE = new WebSocketPacket(); - public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); public static enum MessageType { @@ -86,6 +85,26 @@ public final class WebSocketPacket { this.last = last; } + //消息编码 + public void writeEncode(final ByteArray array) { + final byte opcode = (byte) (type.getValue() | 0x80); + final byte[] content = getPayload(); + final int len = content.length; + if (len <= 0x7D) { //125 + array.put(opcode); + array.put((byte) len); + } else if (len <= 0xFFFF) { // 65535 + array.put(opcode); + array.put((byte) 0x7E); //126 + array.putChar((char) len); + } else { + array.put(opcode); + array.put((byte) 0x7F); //127 + array.putLong(len); + } + array.put(content); + } + public byte[] getPayload() { return payload; } diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index 4aadeb8a4..f80d0a52b 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -80,17 +80,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //同RestWebSocket.single protected boolean single = true; //是否单用户单连接 - //同RestWebSocket.liveinterval - protected int liveinterval = DEFAILT_LIVEINTERVAL; + //同RestWebSocket.liveInterval + protected int liveInterval = DEFAILT_LIVEINTERVAL; - //同RestWebSocket.wsmaxconns - protected int wsmaxconns = 0; + //同RestWebSocket.wsMaxConns + protected int wsMaxConns = 0; - //同RestWebSocket.wsthreads - protected int wsthreads = 0; + //同RestWebSocket.wsThreads + protected int wsThreads = 0; - //同RestWebSocket.wsmaxbody - protected int wsmaxbody = 32 * 1024; + //同RestWebSocket.wsMaxBody + protected int wsMaxBody = 32 * 1024; //同RestWebSocket.anyuser protected boolean anyuser = false; @@ -124,7 +124,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl protected ResourceFactory resourceFactory; protected WebSocketServlet() { - Type msgtype = String.class; + Type msgType = String.class; try { for (Method method : this.getClass().getDeclaredMethods()) { if (!method.getName().equals("createWebSocket")) { @@ -135,17 +135,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } Type rt = TypeToken.getGenericType(method.getGenericReturnType(), this.getClass()); if (rt instanceof ParameterizedType) { - msgtype = ((ParameterizedType) rt).getActualTypeArguments()[1]; + msgType = ((ParameterizedType) rt).getActualTypeArguments()[1]; } - if (msgtype == Object.class) { - msgtype = String.class; + if (msgType == Object.class) { + msgType = String.class; } break; } } catch (Exception e) { logger.warning(this.getClass().getName() + " not designate text message type on createWebSocket Method"); } - this.messageRestType = msgtype; + this.messageRestType = msgType; } @Override @@ -198,7 +198,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", - this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger); + this.single, context, liveInterval, wsMaxConns, wsThreads, wsMaxBody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger); this.webSocketNode.init(conf); this.webSocketNode.localEngine.init(conf); diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java index 13bf9825d..3f09d458d 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java @@ -27,9 +27,9 @@ public class WebSocketWriteHandler implements CompletionHandler { protected final ByteArray writeArray = new ByteArray(); - protected final List> respList = new ArrayList(); + protected final List> respList = new ArrayList(); - protected final ConcurrentLinkedDeque> requestQueue = new ConcurrentLinkedDeque(); + protected final ConcurrentLinkedDeque> requestQueue = new ConcurrentLinkedDeque(); public WebSocketWriteHandler(HttpContext context, WebSocket webSocket) { this.context = context; @@ -37,7 +37,7 @@ public class WebSocketWriteHandler implements CompletionHandler { } public CompletableFuture send(WebSocketPacket... packets) { - WebSocketFuture future = new WebSocketFuture<>(packets); + InnerWebSocketFuture future = new InnerWebSocketFuture<>(packets); if (writePending.compareAndSet(false, true)) { respList.clear(); respList.add(future); @@ -55,12 +55,12 @@ public class WebSocketWriteHandler implements CompletionHandler { @Override public void completed(Integer result, Void attachment) { webSocket.lastSendTime = System.currentTimeMillis(); - for (WebSocketFuture future : respList) { + for (InnerWebSocketFuture future : respList) { future.complete(0); } respList.clear(); writeArray.clear(); - WebSocketFuture req; + InnerWebSocketFuture req; while ((req = requestQueue.poll()) != null) { respList.add(req); for (WebSocketPacket p : req.packets) { @@ -78,12 +78,12 @@ public class WebSocketWriteHandler implements CompletionHandler { @Override public void failed(Throwable exc, Void attachment) { - WebSocketFuture req; + InnerWebSocketFuture req; try { while ((req = requestQueue.poll()) != null) { req.completeExceptionally(exc); } - for (WebSocketFuture future : respList) { + for (InnerWebSocketFuture future : respList) { future.completeExceptionally(exc); } respList.clear(); @@ -116,15 +116,15 @@ public class WebSocketWriteHandler implements CompletionHandler { array.put(content); } - protected static class WebSocketFuture extends CompletableFuture { + protected static class InnerWebSocketFuture extends CompletableFuture { protected WebSocketPacket[] packets; - public WebSocketFuture() { + public InnerWebSocketFuture() { super(); } - public WebSocketFuture(WebSocketPacket... packets) { + public InnerWebSocketFuture(WebSocketPacket... packets) { super(); this.packets = packets; } diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java new file mode 100644 index 000000000..b4421460b --- /dev/null +++ b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java @@ -0,0 +1,100 @@ +/* + * + */ +package org.redkale.net.http; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.*; +import org.redkale.net.AsyncIOThread; +import org.redkale.util.*; + +/** + * WebSocket连接的IO写线程 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +public class WebSocketWriteIOThread extends AsyncIOThread { + + private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); + + private final ScheduledThreadPoolExecutor timeoutExecutor; + + public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads, + ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + super(g, name, index, threads, workExecutor, safeBufferPool); + this.timeoutExecutor = timeoutExecutor; + } + + public CompletableFuture offerRequest(WebSocket websocket, WebSocketPacket packet) { + WebSocketFuture future = new WebSocketFuture(this, websocket, packet); + int ts = websocket._channel.getWriteTimeoutSeconds(); + if (ts > 0) { + future.timeout = timeoutExecutor.schedule(future, ts, TimeUnit.SECONDS); + } + requestQueue.offer(future); + return future; + } + + @Override + public void run() { + final ByteBuffer buffer = getBufferSupplier().get(); + final int capacity = buffer.capacity(); + final ByteArray writeArray = new ByteArray(); + while (!isClosed()) { + WebSocketFuture entry; + try { + while ((entry = requestQueue.take()) != null) { + if (!entry.isDone()) { + writeArray.clear(); + entry.packet.writeEncode(writeArray); + if (writeArray.length() > 0) { + if (writeArray.length() <= capacity) { + buffer.clear(); + buffer.put(writeArray.content(), 0, writeArray.length()); + buffer.flip(); + entry.websocket._channel.write(buffer, entry, writeHandler); + } else { + entry.websocket._channel.write(writeArray, entry, writeHandler); + } + } + } + } + } catch (InterruptedException e) { + } + } + } + + protected final CompletionHandler writeHandler = new CompletionHandler() { + + @Override + public void completed(Integer result, WebSocketFuture attachment) { + attachment.cancelTimeout(); + attachment.workThread = null; + attachment.websocket = null; + attachment.packet = null; + runWork(() -> { + attachment.complete(0); + }); + } + + @Override + public void failed(Throwable exc, WebSocketFuture attachment) { + attachment.cancelTimeout(); + attachment.websocket.close(); + attachment.workThread = null; + attachment.websocket = null; + attachment.packet = null; + runWork(() -> { + attachment.completeExceptionally(exc); + }); + } + }; + +}