增加WebSocketWriteIOThread

This commit is contained in:
Redkale
2023-01-14 21:04:44 +08:00
parent dc0849d82a
commit 58e206d0e9
7 changed files with 237 additions and 49 deletions

View File

@@ -62,7 +62,7 @@ public class ClientWriteIOThread extends AsyncIOThread {
public void run() { public void run() {
final ByteBuffer buffer = getBufferSupplier().get(); final ByteBuffer buffer = getBufferSupplier().get();
final int capacity = buffer.capacity(); final int capacity = buffer.capacity();
final ByteArray writeArray = new ByteArray(1024 * 32); final ByteArray writeArray = new ByteArray();
final Map<ClientConnection, List<ClientFuture>> map = new HashMap<>(); final Map<ClientConnection, List<ClientFuture>> map = new HashMap<>();
final ObjectPool<List> listPool = ObjectPool.createUnsafePool(Utility.cpus() * 2, () -> new ArrayList(), null, t -> { final ObjectPool<List> listPool = ObjectPool.createUnsafePool(Utility.cpus() * 2, () -> new ArrayList(), null, t -> {
t.clear(); t.clear();
@@ -73,34 +73,14 @@ public class ClientWriteIOThread extends AsyncIOThread {
try { try {
while ((entry = requestQueue.take()) != null) { while ((entry = requestQueue.take()) != null) {
map.clear(); map.clear();
{ if (!entry.isDone()) {
Serializable reqid = entry.request.getRequestid();
if (reqid == null) {
entry.conn.responseQueue.offer(entry);
} else {
entry.conn.responseMap.put(reqid, entry);
}
}
if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) {
try {
synchronized (entry.conn.pauseRequests) {
entry.conn.pauseRequests.wait(3_000);
}
} catch (InterruptedException ie) {
}
}
entry.conn.pauseRequests.add(entry);
} else {
map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry);
}
while ((entry = requestQueue.poll()) != null) {
Serializable reqid = entry.request.getRequestid(); Serializable reqid = entry.request.getRequestid();
if (reqid == null) { if (reqid == null) {
entry.conn.responseQueue.offer(entry); entry.conn.responseQueue.offer(entry);
} else { } else {
entry.conn.responseMap.put(reqid, entry); entry.conn.responseMap.put(reqid, entry);
} }
if (entry.conn.pauseWriting.get()) { if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) { if (entry.conn.pauseResuming.get()) {
try { try {
@@ -115,6 +95,29 @@ public class ClientWriteIOThread extends AsyncIOThread {
map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry); map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry);
} }
} }
while ((entry = requestQueue.poll()) != null) {
if (!entry.isDone()) {
Serializable reqid = entry.request.getRequestid();
if (reqid == null) {
entry.conn.responseQueue.offer(entry);
} else {
entry.conn.responseMap.put(reqid, entry);
}
if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) {
try {
synchronized (entry.conn.pauseRequests) {
entry.conn.pauseRequests.wait(3_000);
}
} catch (InterruptedException ie) {
}
}
entry.conn.pauseRequests.add(entry);
} else {
map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry);
}
}
}
map.forEach((conn, list) -> { map.forEach((conn, list) -> {
writeArray.clear(); writeArray.clear();
int i = -1; int i = -1;

View File

@@ -43,6 +43,10 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private ObjectPool<ByteBuffer> safeBufferPool; private ObjectPool<ByteBuffer> safeBufferPool;
//配置<executor threads="0"> APP_EXECUTOR资源为null
//RESNAME_APP_EXECUTOR
protected ExecutorService workExecutor;
public HttpServer() { public HttpServer() {
this(null, System.currentTimeMillis(), ResourceFactory.create()); this(null, System.currentTimeMillis(), ResourceFactory.create());
} }
@@ -53,6 +57,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
public HttpServer(Application application, long serverStartTime, ResourceFactory resourceFactory) { public HttpServer(Application application, long serverStartTime, ResourceFactory resourceFactory) {
super(application, serverStartTime, "TCP", resourceFactory, new HttpDispatcherServlet()); super(application, serverStartTime, "TCP", resourceFactory, new HttpDispatcherServlet());
this.workExecutor = application == null ? null : application.getWorkExecutor();
} }
@Override @Override

View File

@@ -0,0 +1,61 @@
/*
*
*/
package org.redkale.net.http;
import java.util.Objects;
import java.util.concurrent.*;
import org.redkale.net.WorkThread;
/**
* WebSocket连接的IO写线程
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public class WebSocketFuture extends CompletableFuture<Integer> implements Runnable {
WebSocketPacket packet;
WebSocket websocket;
WorkThread workThread;
ScheduledFuture timeout;
WebSocketFuture(WorkThread workThread, WebSocket websocket, WebSocketPacket packet) {
super();
Objects.requireNonNull(workThread);
this.workThread = workThread;
this.websocket = websocket;
this.packet = packet;
}
void cancelTimeout() {
if (timeout != null) {
timeout.cancel(true);
}
}
@Override //JDK9+
public WebSocketFuture newIncompleteFuture() {
WebSocketFuture future = new WebSocketFuture(workThread, websocket, packet);
future.timeout = timeout;
return future;
}
@Override
public void run() {
TimeoutException ex = new TimeoutException();
workThread.runWork(() -> completeExceptionally(ex));
}
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hash(this) + "{websocket = " + websocket + ", packet = " + packet + "}";
}
}

View File

@@ -8,6 +8,7 @@ package org.redkale.net.http;
import java.io.Serializable; import java.io.Serializable;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.redkale.net.http.WebSocketPacket.FrameType; import org.redkale.net.http.WebSocketPacket.FrameType;
import org.redkale.util.ByteArray;
/** /**
* *
@@ -20,8 +21,6 @@ public final class WebSocketPacket {
public static final Object MESSAGE_NIL = new Object(); public static final Object MESSAGE_NIL = new Object();
static final WebSocketPacket NONE = new WebSocketPacket();
public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]);
public static enum MessageType { public static enum MessageType {
@@ -86,6 +85,26 @@ public final class WebSocketPacket {
this.last = last; this.last = last;
} }
//消息编码
public void writeEncode(final ByteArray array) {
final byte opcode = (byte) (type.getValue() | 0x80);
final byte[] content = getPayload();
final int len = content.length;
if (len <= 0x7D) { //125
array.put(opcode);
array.put((byte) len);
} else if (len <= 0xFFFF) { // 65535
array.put(opcode);
array.put((byte) 0x7E); //126
array.putChar((char) len);
} else {
array.put(opcode);
array.put((byte) 0x7F); //127
array.putLong(len);
}
array.put(content);
}
public byte[] getPayload() { public byte[] getPayload() {
return payload; return payload;
} }

View File

@@ -80,17 +80,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//同RestWebSocket.single //同RestWebSocket.single
protected boolean single = true; //是否单用户单连接 protected boolean single = true; //是否单用户单连接
//同RestWebSocket.liveinterval //同RestWebSocket.liveInterval
protected int liveinterval = DEFAILT_LIVEINTERVAL; protected int liveInterval = DEFAILT_LIVEINTERVAL;
//同RestWebSocket.wsmaxconns //同RestWebSocket.wsMaxConns
protected int wsmaxconns = 0; protected int wsMaxConns = 0;
//同RestWebSocket.wsthreads //同RestWebSocket.wsThreads
protected int wsthreads = 0; protected int wsThreads = 0;
//同RestWebSocket.wsmaxbody //同RestWebSocket.wsMaxBody
protected int wsmaxbody = 32 * 1024; protected int wsMaxBody = 32 * 1024;
//同RestWebSocket.anyuser //同RestWebSocket.anyuser
protected boolean anyuser = false; protected boolean anyuser = false;
@@ -124,7 +124,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected ResourceFactory resourceFactory; protected ResourceFactory resourceFactory;
protected WebSocketServlet() { protected WebSocketServlet() {
Type msgtype = String.class; Type msgType = String.class;
try { try {
for (Method method : this.getClass().getDeclaredMethods()) { for (Method method : this.getClass().getDeclaredMethods()) {
if (!method.getName().equals("createWebSocket")) { if (!method.getName().equals("createWebSocket")) {
@@ -135,17 +135,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
Type rt = TypeToken.getGenericType(method.getGenericReturnType(), this.getClass()); Type rt = TypeToken.getGenericType(method.getGenericReturnType(), this.getClass());
if (rt instanceof ParameterizedType) { if (rt instanceof ParameterizedType) {
msgtype = ((ParameterizedType) rt).getActualTypeArguments()[1]; msgType = ((ParameterizedType) rt).getActualTypeArguments()[1];
} }
if (msgtype == Object.class) { if (msgType == Object.class) {
msgtype = String.class; msgType = String.class;
} }
break; break;
} }
} catch (Exception e) { } catch (Exception e) {
logger.warning(this.getClass().getName() + " not designate text message type on createWebSocket Method"); logger.warning(this.getClass().getName() + " not designate text message type on createWebSocket Method");
} }
this.messageRestType = msgtype; this.messageRestType = msgType;
} }
@Override @Override
@@ -198,7 +198,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service //存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger); this.single, context, liveInterval, wsMaxConns, wsThreads, wsMaxBody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger);
this.webSocketNode.init(conf); this.webSocketNode.init(conf);
this.webSocketNode.localEngine.init(conf); this.webSocketNode.localEngine.init(conf);

View File

@@ -27,9 +27,9 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
protected final ByteArray writeArray = new ByteArray(); protected final ByteArray writeArray = new ByteArray();
protected final List<WebSocketFuture<Integer>> respList = new ArrayList(); protected final List<InnerWebSocketFuture<Integer>> respList = new ArrayList();
protected final ConcurrentLinkedDeque<WebSocketFuture<Integer>> requestQueue = new ConcurrentLinkedDeque(); protected final ConcurrentLinkedDeque<InnerWebSocketFuture<Integer>> requestQueue = new ConcurrentLinkedDeque();
public WebSocketWriteHandler(HttpContext context, WebSocket webSocket) { public WebSocketWriteHandler(HttpContext context, WebSocket webSocket) {
this.context = context; this.context = context;
@@ -37,7 +37,7 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
} }
public CompletableFuture<Integer> send(WebSocketPacket... packets) { public CompletableFuture<Integer> send(WebSocketPacket... packets) {
WebSocketFuture<Integer> future = new WebSocketFuture<>(packets); InnerWebSocketFuture<Integer> future = new InnerWebSocketFuture<>(packets);
if (writePending.compareAndSet(false, true)) { if (writePending.compareAndSet(false, true)) {
respList.clear(); respList.clear();
respList.add(future); respList.add(future);
@@ -55,12 +55,12 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
webSocket.lastSendTime = System.currentTimeMillis(); webSocket.lastSendTime = System.currentTimeMillis();
for (WebSocketFuture<Integer> future : respList) { for (InnerWebSocketFuture<Integer> future : respList) {
future.complete(0); future.complete(0);
} }
respList.clear(); respList.clear();
writeArray.clear(); writeArray.clear();
WebSocketFuture req; InnerWebSocketFuture req;
while ((req = requestQueue.poll()) != null) { while ((req = requestQueue.poll()) != null) {
respList.add(req); respList.add(req);
for (WebSocketPacket p : req.packets) { for (WebSocketPacket p : req.packets) {
@@ -78,12 +78,12 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, Void attachment) {
WebSocketFuture req; InnerWebSocketFuture req;
try { try {
while ((req = requestQueue.poll()) != null) { while ((req = requestQueue.poll()) != null) {
req.completeExceptionally(exc); req.completeExceptionally(exc);
} }
for (WebSocketFuture<Integer> future : respList) { for (InnerWebSocketFuture<Integer> future : respList) {
future.completeExceptionally(exc); future.completeExceptionally(exc);
} }
respList.clear(); respList.clear();
@@ -116,15 +116,15 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
array.put(content); array.put(content);
} }
protected static class WebSocketFuture<T> extends CompletableFuture<T> { protected static class InnerWebSocketFuture<T> extends CompletableFuture<T> {
protected WebSocketPacket[] packets; protected WebSocketPacket[] packets;
public WebSocketFuture() { public InnerWebSocketFuture() {
super(); super();
} }
public WebSocketFuture(WebSocketPacket... packets) { public InnerWebSocketFuture(WebSocketPacket... packets) {
super(); super();
this.packets = packets; this.packets = packets;
} }

View File

@@ -0,0 +1,100 @@
/*
*
*/
package org.redkale.net.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.*;
import org.redkale.net.AsyncIOThread;
import org.redkale.util.*;
/**
* WebSocket连接的IO写线程
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public class WebSocketWriteIOThread extends AsyncIOThread {
private final BlockingDeque<WebSocketFuture> requestQueue = new LinkedBlockingDeque<>();
private final ScheduledThreadPoolExecutor timeoutExecutor;
public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads,
ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, safeBufferPool);
this.timeoutExecutor = timeoutExecutor;
}
public CompletableFuture<Integer> offerRequest(WebSocket websocket, WebSocketPacket packet) {
WebSocketFuture future = new WebSocketFuture(this, websocket, packet);
int ts = websocket._channel.getWriteTimeoutSeconds();
if (ts > 0) {
future.timeout = timeoutExecutor.schedule(future, ts, TimeUnit.SECONDS);
}
requestQueue.offer(future);
return future;
}
@Override
public void run() {
final ByteBuffer buffer = getBufferSupplier().get();
final int capacity = buffer.capacity();
final ByteArray writeArray = new ByteArray();
while (!isClosed()) {
WebSocketFuture entry;
try {
while ((entry = requestQueue.take()) != null) {
if (!entry.isDone()) {
writeArray.clear();
entry.packet.writeEncode(writeArray);
if (writeArray.length() > 0) {
if (writeArray.length() <= capacity) {
buffer.clear();
buffer.put(writeArray.content(), 0, writeArray.length());
buffer.flip();
entry.websocket._channel.write(buffer, entry, writeHandler);
} else {
entry.websocket._channel.write(writeArray, entry, writeHandler);
}
}
}
}
} catch (InterruptedException e) {
}
}
}
protected final CompletionHandler<Integer, WebSocketFuture> writeHandler = new CompletionHandler<Integer, WebSocketFuture>() {
@Override
public void completed(Integer result, WebSocketFuture attachment) {
attachment.cancelTimeout();
attachment.workThread = null;
attachment.websocket = null;
attachment.packet = null;
runWork(() -> {
attachment.complete(0);
});
}
@Override
public void failed(Throwable exc, WebSocketFuture attachment) {
attachment.cancelTimeout();
attachment.websocket.close();
attachment.workThread = null;
attachment.websocket = null;
attachment.packet = null;
runWork(() -> {
attachment.completeExceptionally(exc);
});
}
};
}