WebSocket优化

This commit is contained in:
redkale
2023-04-06 21:23:27 +08:00
parent 91d7f51678
commit 7286531bc5
9 changed files with 54 additions and 245 deletions

View File

@@ -10,7 +10,6 @@ import java.nio.charset.Charset;
import java.security.SecureRandom;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.redkale.annotation.ConstructorParameters;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
@@ -49,8 +48,6 @@ public class HttpContext extends Context {
//不带通配符的mapping url的缓存对象
final Map<ByteArray, String>[] uriPathCaches = new Map[100];
Function<WebSocket, WebSocketWriteIOThread> webSocketWriterIOThreadFunc;
public HttpContext(HttpContextConfig config) {
super(config);
this.remoteAddrHeader = config.remoteAddrHeader;
@@ -94,14 +91,6 @@ public class HttpContext extends Context {
super.updateWriteIOThread(conn, ioWriteThread);
}
protected void updateWebSocketWriteIOThread(WebSocket webSocket) {
if (webSocketWriterIOThreadFunc != null) {
WebSocketWriteIOThread writeIOThread = webSocketWriterIOThreadFunc.apply(webSocket);
updateWriteIOThread(webSocket._channel, writeIOThread);
webSocket._writeIOThread = writeIOThread;
}
}
protected String createSessionid() {
byte[] bytes = new byte[16];
random.nextBytes(bytes);

View File

@@ -45,8 +45,6 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private final ReentrantLock addLock = new ReentrantLock();
private WebSocketAsyncGroup asyncGroup;
//配置<executor threads="0"> APP_EXECUTOR资源为null
//RESNAME_APP_EXECUTOR
protected ExecutorService workExecutor;
@@ -84,9 +82,6 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
this.dateScheduler.shutdownNow();
this.dateScheduler = null;
}
if (asyncGroup != null) {
asyncGroup.close();
}
if (context.rpcAuthenticator != null) {
context.rpcAuthenticator.destroy(context.rpcAuthenticatorConfig);
}
@@ -566,25 +561,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
throw new HttpException("init HttpRpcAuthenticator(" + impl + ") error", e);
}
}
HttpContext rs = new HttpContext(contextConfig);
if (false) { //暂不使用WebSocketAsyncGroup模式
rs.webSocketWriterIOThreadFunc = ws -> {
if (asyncGroup == null) {
groupLock.lock();
try {
if (asyncGroup == null) {
WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, safeBufferPool);
g.start();
asyncGroup = g;
}
} finally {
groupLock.unlock();
}
}
return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread();
};
}
return rs;
return new HttpContext(contextConfig);
}
@Override

View File

@@ -12,7 +12,6 @@ import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.Stream;
import java.util.zip.*;
@@ -88,8 +87,6 @@ public abstract class WebSocket<G extends Serializable, T> {
WebSocketWriteHandler _writeHandler;
WebSocketWriteIOThread _writeIOThread;
InetSocketAddress _sncpAddress; //分布式下不可为空
AsyncConnection _channel;//不可能为空
@@ -721,42 +718,6 @@ public abstract class WebSocket<G extends Serializable, T> {
return _engine.getLocalWebSockets();
}
/**
* 获取ByteBuffer生成器
*
* @return Supplier
*/
protected Supplier<ByteBuffer> getReadBufferSupplier() {
return this._channel.getReadBufferSupplier();
}
/**
* 获取ByteBuffer回收器
*
* @return Consumer
*/
protected Consumer<ByteBuffer> getReadBufferConsumer() {
return this._channel.getReadBufferConsumer();
}
/**
* 获取ByteBuffer生成器
*
* @return Supplier
*/
protected Supplier<ByteBuffer> getWriteBufferSupplier() {
return this._channel.getWriteBufferSupplier();
}
/**
* 获取ByteBuffer回收器
*
* @return Consumer
*/
protected Consumer<ByteBuffer> getWriteBufferConsumer() {
return this._channel.getWriteBufferConsumer();
}
//-------------------------------------------------------------------
/**
* 返回sessionid, null表示连接不合法或异常,默认实现是request.sessionid(true),通常需要重写该方法
@@ -964,6 +925,12 @@ public abstract class WebSocket<G extends Serializable, T> {
}
CompletableFuture<Void> future = _engine.removeLocalThenDisconnect(this);
_channel.dispose();
if (_readHandler != null) {
_readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes);
}
if (_writeHandler != null) {
_writeHandler.byteArrayPool.accept(_writeHandler.writeArray);
}
CompletableFuture closeFuture = onClose(code, reason);
if (closeFuture == null) {
return future;

View File

@@ -1,35 +0,0 @@
/*
*
*/
package org.redkale.net.http;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.redkale.net.*;
import org.redkale.util.ByteBufferPool;
/**
* WebSocket只写版的AsyncIOGroup <br>
* 只会用到ioWriteThread
*
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Deprecated(since = "2.8.0")
class WebSocketAsyncGroup extends AsyncIOGroup {
public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
super(threadNameFormat, workExecutor, safeBufferPool);
}
@Override
protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
return new WebSocketWriteIOThread(this.timeoutExecutor, g, name, index, threads, workExecutor, safeBufferPool);
}
}

View File

@@ -16,7 +16,7 @@ import org.redkale.convert.Convert;
import org.redkale.net.AsyncIOThread;
import static org.redkale.net.http.WebSocket.*;
import org.redkale.net.http.WebSocketPacket.FrameType;
import org.redkale.util.ByteArray;
import org.redkale.util.*;
/**
*
@@ -38,7 +38,9 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
protected FrameType currSeriesMergeMessageType;
protected final ByteArray halfFrameBytes = new ByteArray();
protected final ObjectPool<ByteArray> byteArrayPool;
protected final ByteArray halfFrameBytes;
protected byte halfFrameOpcode;
@@ -52,10 +54,12 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
protected AsyncIOThread ioReadThread;
public WebSocketReadHandler(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer) {
public WebSocketReadHandler(HttpContext context, WebSocket webSocket, ObjectPool<ByteArray> byteArrayPool, BiConsumer<WebSocket, Object> messageConsumer) {
this.context = context;
this.restMessageConsumer = messageConsumer;
this.webSocket = webSocket;
this.byteArrayPool = byteArrayPool;
this.restMessageConsumer = messageConsumer;
this.halfFrameBytes = byteArrayPool.get();
this.ioReadThread = webSocket._channel.getReadIOThread();
this.logger = context.getLogger();
}

View File

@@ -70,6 +70,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
private final BiConsumer<WebSocket, Object> restMessageConsumer = createRestOnMessageConsumer();
private final ObjectPool<ByteArray> byteArrayPool = ObjectPool.createSafePool(1000, () -> new ByteArray(), null, ByteArray::recycle);
protected Type messageRestType; //RestWebSocket时会被修改
//同RestWebSocket.single
@@ -284,9 +286,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override
public void completed(Integer result, Void attachment) {
webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, restMessageConsumer);
webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket);
//response.getContext().updateWebSocketWriteIOThread(webSocket);
webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, byteArrayPool, restMessageConsumer);
webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool);
Runnable createUseridHandler = () -> {
CompletableFuture<Serializable> userFuture = webSocket.createUserid();

View File

@@ -11,7 +11,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import static org.redkale.net.http.WebSocket.*;
import org.redkale.util.ByteArray;
import org.redkale.util.*;
/**
*
@@ -25,27 +25,32 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
protected final AtomicBoolean writePending = new AtomicBoolean();
protected final ByteArray writeArray = new ByteArray();
protected final ObjectPool<ByteArray> byteArrayPool;
protected final List<InnerWebSocketFuture<Integer>> respList = new ArrayList();
protected final ByteArray writeArray;
protected final ConcurrentLinkedDeque<InnerWebSocketFuture<Integer>> requestQueue = new ConcurrentLinkedDeque();
protected final List<WebSocketFuture<Integer>> respList = new ArrayList();
public WebSocketWriteHandler(HttpContext context, WebSocket webSocket) {
protected final ConcurrentLinkedDeque<WebSocketFuture<Integer>> requestQueue = new ConcurrentLinkedDeque();
public WebSocketWriteHandler(HttpContext context, WebSocket webSocket, ObjectPool<ByteArray> byteArrayPool) {
this.context = context;
this.webSocket = webSocket;
this.byteArrayPool = byteArrayPool;
this.writeArray = byteArrayPool.get();
}
public CompletableFuture<Integer> send(WebSocketPacket... packets) {
InnerWebSocketFuture<Integer> future = new InnerWebSocketFuture<>(packets);
WebSocketFuture<Integer> future = new WebSocketFuture<>(packets);
if (writePending.compareAndSet(false, true)) {
respList.clear();
respList.add(future);
writeArray.clear();
ByteArray array = this.writeArray;
array.clear();
for (WebSocketPacket p : packets) {
writeEncode(p);
writeEncode(array, p);
}
webSocket._channel.write(writeArray, this);
webSocket._channel.write(array, this);
} else {
requestQueue.offer(future);
}
@@ -55,35 +60,36 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
@Override
public void completed(Integer result, Void attachment) {
webSocket.lastSendTime = System.currentTimeMillis();
for (InnerWebSocketFuture<Integer> future : respList) {
for (WebSocketFuture<Integer> future : respList) {
future.complete(0);
}
respList.clear();
writeArray.clear();
InnerWebSocketFuture req;
ByteArray array = this.writeArray;
array.clear();
WebSocketFuture req;
while ((req = requestQueue.poll()) != null) {
respList.add(req);
for (WebSocketPacket p : req.packets) {
writeEncode(p);
writeEncode(array, p);
}
}
if (writeArray.isEmpty()) {
if (array.isEmpty()) {
if (!writePending.compareAndSet(true, false)) {
completed(0, attachment);
}
} else {
webSocket._channel.write(writeArray, this);
webSocket._channel.write(array, this);
}
}
@Override
public void failed(Throwable exc, Void attachment) {
InnerWebSocketFuture req;
WebSocketFuture req;
try {
while ((req = requestQueue.poll()) != null) {
req.completeExceptionally(exc);
}
for (InnerWebSocketFuture<Integer> future : respList) {
for (WebSocketFuture<Integer> future : respList) {
future.completeExceptionally(exc);
}
respList.clear();
@@ -96,8 +102,7 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
}
//消息编码
protected void writeEncode(final WebSocketPacket packet) {
final ByteArray array = writeArray;
protected void writeEncode(final ByteArray array, final WebSocketPacket packet) {
final byte opcode = (byte) (packet.type.getValue() | 0x80);
final byte[] content = packet.getPayload();
final int len = content.length;
@@ -116,15 +121,15 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
array.put(content);
}
protected static class InnerWebSocketFuture<T> extends CompletableFuture<T> {
protected static class WebSocketFuture<T> extends CompletableFuture<T> {
protected WebSocketPacket[] packets;
public InnerWebSocketFuture() {
public WebSocketFuture() {
super();
}
public InnerWebSocketFuture(WebSocketPacket... packets) {
public WebSocketFuture(WebSocketPacket... packets) {
super();
this.packets = packets;
}

View File

@@ -1,107 +0,0 @@
/*
*
*/
package org.redkale.net.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.Objects;
import java.util.concurrent.*;
import org.redkale.net.AsyncIOThread;
import org.redkale.util.*;
/**
* WebSocket连接的IO写线程
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Deprecated(since = "2.8.0")
public class WebSocketWriteIOThread extends AsyncIOThread {
private final ScheduledExecutorService timeoutExecutor;
private final BlockingDeque<WebSocketFuture> requestQueue = new LinkedBlockingDeque<>();
public WebSocketWriteIOThread(ScheduledExecutorService timeoutExecutor, ThreadGroup g, String name, int index, int threads,
ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, safeBufferPool);
Objects.requireNonNull(timeoutExecutor);
this.timeoutExecutor = timeoutExecutor;
}
public CompletableFuture<Integer> send(WebSocket websocket, WebSocketPacket... packets) {
Objects.requireNonNull(websocket);
Objects.requireNonNull(packets);
WebSocketFuture future = new WebSocketFuture(this, websocket, packets);
int wts = websocket._channel.getWriteTimeoutSeconds();
if (wts > 0) {
future.timeout = timeoutExecutor.schedule(future, wts, TimeUnit.SECONDS);
}
requestQueue.offer(future);
return future;
}
@Override
public void run() {
final ByteBuffer buffer = getBufferSupplier().get();
final int capacity = buffer.capacity();
final ByteArray writeArray = new ByteArray();
while (!isClosed()) {
WebSocketFuture entry;
try {
while ((entry = requestQueue.take()) != null) {
if (!entry.isDone()) {
writeArray.clear();
for (WebSocketPacket packet : entry.packets) {
packet.writeEncode(writeArray);
}
if (writeArray.length() > 0) {
if (writeArray.length() <= capacity) {
buffer.clear();
buffer.put(writeArray.content(), 0, writeArray.length());
buffer.flip();
entry.websocket._channel.write(buffer, entry, writeHandler);
} else {
entry.websocket._channel.write(writeArray, entry, writeHandler);
}
}
}
}
} catch (InterruptedException e) {
}
}
}
protected final CompletionHandler<Integer, WebSocketFuture> writeHandler = new CompletionHandler<Integer, WebSocketFuture>() {
@Override
public void completed(Integer result, WebSocketFuture attachment) {
attachment.cancelTimeout();
attachment.workThread = null;
attachment.websocket = null;
attachment.packets = null;
runWork(() -> {
attachment.complete(0);
});
}
@Override
public void failed(Throwable exc, WebSocketFuture attachment) {
attachment.cancelTimeout();
attachment.websocket.close();
attachment.workThread = null;
attachment.websocket = null;
attachment.packets = null;
runWork(() -> {
attachment.completeExceptionally(exc);
});
}
};
}

View File

@@ -52,6 +52,14 @@ public final class ByteArray implements ByteTuple {
return this;
}
public boolean recycle() {
this.count = 0;
if (this.content != null && this.content.length > 1024 * 32) {
this.content = new byte[1024];
}
return true;
}
/**
* 设置count的新位置
*