优化WebSocket的写操作

This commit is contained in:
Redkale
2023-01-14 22:33:05 +08:00
parent 58e206d0e9
commit 55ddf7c417
10 changed files with 134 additions and 48 deletions

View File

@@ -48,15 +48,15 @@ public class AsyncIOGroup extends AsyncGroup {
private final AtomicInteger writeIndex = new AtomicInteger();
//创建数
final LongAdder connCreateCounter = new LongAdder();
protected final LongAdder connCreateCounter = new LongAdder();
//在线数
final LongAdder connLivingCounter = new LongAdder();
protected final LongAdder connLivingCounter = new LongAdder();
//关闭数
final LongAdder connClosedCounter = new LongAdder();
protected final LongAdder connClosedCounter = new LongAdder();
private ScheduledThreadPoolExecutor timeoutExecutor;
protected final ScheduledThreadPoolExecutor timeoutExecutor;
public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) {
this(true, "Redkale-AnonymousClient-IOThread-%s", null, bufferCapacity, bufferPoolSize);
@@ -73,36 +73,50 @@ public class AsyncIOGroup extends AsyncGroup {
}));
}
@SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
this.bufferCapacity = bufferCapacity;
final int threads = Utility.cpus();
this.ioReadThreads = new AsyncIOThread[threads];
this.ioWriteThreads = new AsyncIOThread[threads];
final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group"));
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r, String.format(threadNameFormat, "Timeout"));
t.setDaemon(true);
return t;
});
try {
for (int i = 0; i < threads; i++) {
String indexfix = WorkThread.formatIndex(threads, i + 1);
if (client) {
this.ioReadThreads[i] = new ClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool);
this.ioWriteThreads[i] = new ClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool);
this.ioReadThreads[i] = createClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool);
this.ioWriteThreads[i] = createClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool);
} else {
this.ioReadThreads[i] = new AsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool);
this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i];
}
}
if (client) {
this.connectThread = new ClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool);
this.connectThread = createClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool);
} else {
this.connectThread = null;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r, String.format(threadNameFormat, "Timeout"));
t.setDaemon(true);
return t;
});
}
protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool);
}
protected AsyncIOThread createClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
return new ClientReadIOThread(g, name, index, threads, workExecutor, safeBufferPool);
}
protected AsyncIOThread createClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
return new ClientWriteIOThread(g, name, index, threads, workExecutor, safeBufferPool);
}
@Override

View File

@@ -8,6 +8,7 @@ package org.redkale.net.http;
import java.nio.channels.CompletionHandler;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.redkale.annotation.ConstructorParameters;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
@@ -43,6 +44,8 @@ public class HttpContext extends Context {
//所有Servlet方法都不需要读取http-headerlazyHeaders=true
protected boolean lazyHeaders; //存在动态改值
Function<WebSocket, WebSocketWriteIOThread> webSocketWriterIOThreadFunc;
// protected RequestURINode[] uriCacheNodes;
public HttpContext(HttpContextConfig config) {
super(config);
@@ -79,6 +82,12 @@ public class HttpContext extends Context {
super.updateWriteIOThread(conn, ioWriteThread);
}
protected void updateWebSocketWriteIOThread(WebSocket webSocket) {
WebSocketWriteIOThread writeIOThread = webSocketWriterIOThreadFunc.apply(webSocket);
updateWriteIOThread(webSocket._channel, writeIOThread);
webSocket._writeIOThread = writeIOThread;
}
protected String createSessionid() {
byte[] bytes = new byte[16];
random.nextBytes(bytes);

View File

@@ -43,6 +43,10 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private ObjectPool<ByteBuffer> safeBufferPool;
private final Object groupLock = new Object();
private WebSocketAsyncGroup asyncGroup;
//配置<executor threads="0"> APP_EXECUTOR资源为null
//RESNAME_APP_EXECUTOR
protected ExecutorService workExecutor;
@@ -80,6 +84,9 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
this.dateScheduler.shutdownNow();
this.dateScheduler = null;
}
if (asyncGroup != null) {
asyncGroup.close();
}
if (context.rpcAuthenticator != null) {
context.rpcAuthenticator.destroy(context.rpcAuthenticatorConfig);
}
@@ -536,7 +543,20 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
throw new HttpException("init HttpRpcAuthenticator(" + impl + ") error", e);
}
}
return new HttpContext(contextConfig);
HttpContext rs = new HttpContext(contextConfig);
rs.webSocketWriterIOThreadFunc = ws -> {
if (asyncGroup == null) {
synchronized (groupLock) {
if (asyncGroup == null) {
WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, bufferCapacity, safeBufferPool);
g.start();
asyncGroup = g;
}
}
}
return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread();
};
return rs;
}
@Override

View File

@@ -544,7 +544,7 @@ public final class Rest {
{
av0 = fv.visitAnnotation(resDesc, true);
av0.visit("name", res != null ? res.name() : res2.name());
av0.visit("required", res != null ? res.required() : false);
av0.visit("required", res != null ? res.required() : true);
av0.visitEnd();
}
fv.visitEnd();

View File

@@ -83,10 +83,10 @@ public abstract class WebSocket<G extends Serializable, T> {
WebSocketEngine _engine; //不可能为空
//WebSocketRunner _runner; //不可能为空
WebSocketReadHandler _readHandler;
WebSocketWriteHandler _writeHandler;
//WebSocketWriteHandler _writeHandler;
WebSocketWriteIOThread _writeIOThread;
InetSocketAddress _sncpAddress; //分布式下不可为空
@@ -238,14 +238,14 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 0表示成功 非0表示错误码
*/
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
if (this._writeHandler == null) {
if (this._writeIOThread == null) {
if (delayPackets == null) {
delayPackets = new ArrayList<>();
}
delayPackets.add(packet);
return CompletableFuture.completedFuture(RETCODE_DEAYSEND);
}
CompletableFuture<Integer> rs = this._writeHandler.send(packet);
CompletableFuture<Integer> rs = this._writeIOThread.send(this, packet);
if (_engine.logger.isLoggable(Level.FINER) && packet != WebSocketPacket.DEFAULT_PING_PACKET) {
_engine.logger.finer("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this);
}

View File

@@ -0,0 +1,35 @@
/*
*
*/
package org.redkale.net.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import org.redkale.net.*;
import org.redkale.util.ObjectPool;
/**
* WebSocket只写版的AsyncIOGroup <br>
* 只会用到ioWriteThread
*
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
class WebSocketAsyncGroup extends AsyncIOGroup {
public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
super(false, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool);
}
@Override
protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
return new WebSocketWriteIOThread(this.timeoutExecutor, g, name, index, threads, workExecutor, safeBufferPool);
}
}

View File

@@ -3,7 +3,7 @@
*/
package org.redkale.net.http;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;
import org.redkale.net.WorkThread;
@@ -19,7 +19,7 @@ import org.redkale.net.WorkThread;
*/
public class WebSocketFuture extends CompletableFuture<Integer> implements Runnable {
WebSocketPacket packet;
WebSocketPacket[] packets;
WebSocket websocket;
@@ -27,12 +27,11 @@ public class WebSocketFuture extends CompletableFuture<Integer> implements Runna
ScheduledFuture timeout;
WebSocketFuture(WorkThread workThread, WebSocket websocket, WebSocketPacket packet) {
WebSocketFuture(WorkThread workThread, WebSocket websocket, WebSocketPacket... packets) {
super();
Objects.requireNonNull(workThread);
this.workThread = workThread;
this.websocket = websocket;
this.packet = packet;
this.packets = packets;
}
void cancelTimeout() {
@@ -43,7 +42,7 @@ public class WebSocketFuture extends CompletableFuture<Integer> implements Runna
@Override //JDK9+
public WebSocketFuture newIncompleteFuture() {
WebSocketFuture future = new WebSocketFuture(workThread, websocket, packet);
WebSocketFuture future = new WebSocketFuture(workThread, websocket, packets);
future.timeout = timeout;
return future;
}
@@ -56,6 +55,6 @@ public class WebSocketFuture extends CompletableFuture<Integer> implements Runna
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hash(this) + "{websocket = " + websocket + ", packet = " + packet + "}";
return getClass().getSimpleName() + "_" + Objects.hash(this) + "{websocket = " + websocket + ", packets = " + Arrays.toString(packets) + "}";
}
}

View File

@@ -80,17 +80,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//同RestWebSocket.single
protected boolean single = true; //是否单用户单连接
//同RestWebSocket.liveInterval
protected int liveInterval = DEFAILT_LIVEINTERVAL;
//同RestWebSocket.liveinterval
protected int liveinterval = DEFAILT_LIVEINTERVAL;
//同RestWebSocket.wsMaxConns
protected int wsMaxConns = 0;
protected int wsmaxconns = 0;
//同RestWebSocket.wsThreads
protected int wsThreads = 0;
protected int wsthreads = 0;
//同RestWebSocket.wsMaxBody
protected int wsMaxBody = 32 * 1024;
protected int wsmaxbody = 32 * 1024;
//同RestWebSocket.anyuser
protected boolean anyuser = false;
@@ -198,7 +198,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
this.single, context, liveInterval, wsMaxConns, wsThreads, wsMaxBody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger);
this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger);
this.webSocketNode.init(conf);
this.webSocketNode.localEngine.init(conf);
@@ -293,7 +293,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override
public void completed(Integer result, Void attachment) {
webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, restMessageConsumer);
webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket);
//webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket);
response.getContext().updateWebSocketWriteIOThread(webSocket);
Runnable createUseridHandler = () -> {
CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) {
@@ -355,7 +357,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (webSocket.delayPackets != null) { //存在待发送的消息
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> {
if (userid == null || t != null) {
if (t != null) {
@@ -374,7 +376,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (webSocket.delayPackets != null) { //存在待发送的消息
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> {
if (sessionid == null || t != null) {
if (t != null) {

View File

@@ -5,18 +5,19 @@
*/
package org.redkale.net.http;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.logging.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.nio.channels.CompletionHandler;
import org.redkale.util.ByteArray;
import java.util.logging.Level;
import static org.redkale.net.http.WebSocket.*;
import org.redkale.util.ByteArray;
/**
*
* @author zhangjx
*/
@Deprecated(since = "2.8.0")
public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
protected final HttpContext context;

View File

@@ -6,6 +6,7 @@ package org.redkale.net.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.Objects;
import java.util.concurrent.*;
import org.redkale.net.AsyncIOThread;
import org.redkale.util.*;
@@ -22,21 +23,24 @@ import org.redkale.util.*;
*/
public class WebSocketWriteIOThread extends AsyncIOThread {
private final BlockingDeque<WebSocketFuture> requestQueue = new LinkedBlockingDeque<>();
private final ScheduledThreadPoolExecutor timeoutExecutor;
private final BlockingDeque<WebSocketFuture> requestQueue = new LinkedBlockingDeque<>();
public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads,
ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, safeBufferPool);
Objects.requireNonNull(timeoutExecutor);
this.timeoutExecutor = timeoutExecutor;
}
public CompletableFuture<Integer> offerRequest(WebSocket websocket, WebSocketPacket packet) {
WebSocketFuture future = new WebSocketFuture(this, websocket, packet);
int ts = websocket._channel.getWriteTimeoutSeconds();
if (ts > 0) {
future.timeout = timeoutExecutor.schedule(future, ts, TimeUnit.SECONDS);
public CompletableFuture<Integer> send(WebSocket websocket, WebSocketPacket... packets) {
Objects.requireNonNull(websocket);
Objects.requireNonNull(packets);
WebSocketFuture future = new WebSocketFuture(this, websocket, packets);
int wts = websocket._channel.getWriteTimeoutSeconds();
if (wts > 0) {
future.timeout = timeoutExecutor.schedule(future, wts, TimeUnit.SECONDS);
}
requestQueue.offer(future);
return future;
@@ -53,7 +57,9 @@ public class WebSocketWriteIOThread extends AsyncIOThread {
while ((entry = requestQueue.take()) != null) {
if (!entry.isDone()) {
writeArray.clear();
entry.packet.writeEncode(writeArray);
for (WebSocketPacket packet : entry.packets) {
packet.writeEncode(writeArray);
}
if (writeArray.length() > 0) {
if (writeArray.length() <= capacity) {
buffer.clear();
@@ -78,7 +84,7 @@ public class WebSocketWriteIOThread extends AsyncIOThread {
attachment.cancelTimeout();
attachment.workThread = null;
attachment.websocket = null;
attachment.packet = null;
attachment.packets = null;
runWork(() -> {
attachment.complete(0);
});
@@ -90,7 +96,7 @@ public class WebSocketWriteIOThread extends AsyncIOThread {
attachment.websocket.close();
attachment.workThread = null;
attachment.websocket = null;
attachment.packet = null;
attachment.packets = null;
runWork(() -> {
attachment.completeExceptionally(exc);
});