WebSocket优化

This commit is contained in:
redkale
2024-10-21 17:10:52 +08:00
parent e2a3560661
commit 5fbe56bfc1
6 changed files with 98 additions and 183 deletions

View File

@@ -68,7 +68,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected Object writeAttachment;
protected CompletionHandler<Integer, Object> writeCompletionHandler;
protected CompletionHandler<Integer, ?> writeCompletionHandler;
protected SelectionKey writeKey;
@@ -480,7 +480,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
protected void handleWrite(final int totalCount, Throwable t) {
CompletionHandler<Integer, Object> handler = this.writeCompletionHandler;
CompletionHandler handler = this.writeCompletionHandler;
Object attach = this.writeAttachment;
// 清空写参数
this.writeCompletionHandler = null;

View File

@@ -26,40 +26,35 @@ public class PipelinePacket {
protected int tupleLength;
@ConvertColumn(index = 4)
protected CompletionHandler<Integer, Object> handler;
protected CompletionHandler<Integer, ?> handler;
@ConvertColumn(index = 5)
protected Object attach;
public PipelinePacket() {}
public PipelinePacket(ByteTuple data, CompletionHandler<Integer, Object> handler) {
public PipelinePacket(ByteTuple data, CompletionHandler<Integer, ?> handler) {
this(data, handler, null);
}
public PipelinePacket(ByteTuple data, CompletionHandler<Integer, Object> handler, Object attach) {
public PipelinePacket(ByteTuple data, CompletionHandler<Integer, ?> handler, Object attach) {
this(data.content(), data.offset(), data.length(), handler, attach);
}
public PipelinePacket(byte[] tupleBytes, CompletionHandler<Integer, Object> handler) {
public PipelinePacket(byte[] tupleBytes, CompletionHandler<Integer, ?> handler) {
this(tupleBytes, 0, tupleBytes.length, handler, null);
}
public PipelinePacket(byte[] tupleBytes, CompletionHandler<Integer, Object> handler, Object attach) {
public PipelinePacket(byte[] tupleBytes, CompletionHandler<Integer, ?> handler, Object attach) {
this(tupleBytes, 0, tupleBytes.length, handler, attach);
}
public PipelinePacket(
byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler<Integer, Object> handler) {
public PipelinePacket(byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler<Integer, ?> handler) {
this(tupleBytes, tupleOffset, tupleLength, handler, null);
}
public PipelinePacket(
byte[] tupleBytes,
int tupleOffset,
int tupleLength,
CompletionHandler<Integer, Object> handler,
Object attach) {
byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler<Integer, ?> handler, Object attach) {
this.tupleBytes = tupleBytes;
this.tupleOffset = tupleOffset;
this.tupleLength = tupleLength;
@@ -91,11 +86,11 @@ public class PipelinePacket {
this.tupleLength = tupleLength;
}
public CompletionHandler<Integer, Object> getHandler() {
public CompletionHandler<Integer, ?> getHandler() {
return handler;
}
public void setHandler(CompletionHandler<Integer, Object> handler) {
public void setHandler(CompletionHandler<Integer, ?> handler) {
this.handler = handler;
}

View File

@@ -8,6 +8,7 @@ package org.redkale.net.http;
import java.io.Serializable;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -21,6 +22,7 @@ import org.redkale.annotation.Nonnull;
import org.redkale.convert.Convert;
import org.redkale.net.AsyncConnection;
import org.redkale.net.http.WebSocketPacket.FrameType;
import org.redkale.util.ByteArray;
/**
*
@@ -96,8 +98,6 @@ public abstract class WebSocket<G extends Serializable, T> {
WebSocketReadHandler _readHandler;
WebSocketWriteHandler _writeHandler;
// 分布式下不可为空
InetSocketAddress _sncpAddress;
@@ -273,18 +273,54 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 0表示成功 非0表示错误码
*/
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
if (this._writeHandler == null) { // if (this._writeIOThread == null) {
if (this._readHandler == null) { // if (this._writeIOThread == null) {
if (delayPackets == null) {
delayPackets = new ArrayList<>();
}
delayPackets.add(packet);
return CompletableFuture.completedFuture(RETCODE_DELAYSEND);
}
CompletableFuture<Integer> rs = this._writeHandler.send(packet); // this._writeIOThread.send(this, packet);
return _sendToChannel(packet);
}
/**
* 给自身发送消息体, 包含二进制/文本
*
* @param packet WebSocketPacket
* @return 0表示成功 非0表示错误码
*/
CompletableFuture<Integer> _sendToChannel(WebSocketPacket packet) {
if (_channel == null || closed.get()) {
return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED);
}
WebSocketFuture future = new WebSocketFuture();
_channel.writeInIOThread(packet.encodeToBytes(), future);
if (_engine.logger.isLoggable(Level.FINER) && packet != WebSocketPacket.DEFAULT_PING_PACKET) {
_engine.logger.finer("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this);
}
return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs;
return future;
}
/**
* 给自身发送消息体, 包含二进制/文本
*
* @param packets WebSocketPacket集合
* @return 0表示成功 非0表示错误码
*/
CompletableFuture<Integer> _sendToChannel(List<WebSocketPacket> packets) {
if (_channel == null || closed.get()) {
return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED);
}
WebSocketFuture future = new WebSocketFuture();
ByteArray array = new ByteArray();
for (WebSocketPacket packet : packets) {
array.put(packet.encodeToBytes());
}
_channel.writeInIOThread(array.toArray(), future);
if (_engine.logger.isLoggable(Level.FINER)) {
_engine.logger.finer("userid:" + getUserid() + " send websocket messages(" + packets + ")" + " on " + this);
}
return future;
}
// ----------------------------------------------------------------
@@ -952,9 +988,6 @@ public abstract class WebSocket<G extends Serializable, T> {
if (_readHandler != null) {
_readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes);
}
if (_writeHandler != null) {
_writeHandler.byteArrayPool.accept(_writeHandler.writeArray);
}
return onClose(code, reason);
};
CompletableFuture<Void> future = _engine.removeLocalThenDisconnect(this);
@@ -979,4 +1012,22 @@ public abstract class WebSocket<G extends Serializable, T> {
public String toString() {
return this.getUserid() + "@" + _remoteAddr + "@" + Objects.hashCode(this);
}
protected class WebSocketFuture extends CompletableFuture<Integer> implements CompletionHandler<Integer, Void> {
public WebSocketFuture() {
super();
}
@Override
public void completed(Integer result, Void attachment) {
super.complete(0);
}
@Override
public void failed(Throwable exc, Void attachment) {
super.completeExceptionally(exc);
kill(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler");
}
}
}

View File

@@ -9,7 +9,6 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.redkale.convert.ConvertColumn;
import org.redkale.net.http.WebSocketPacket.FrameType;
import org.redkale.util.ByteArray;
/**
* 详情见: https://redkale.org
@@ -94,23 +93,39 @@ public final class WebSocketPacket {
}
// 消息编码
public void writeEncode(final ByteArray array) {
public byte[] encodeToBytes() {
final byte opcode = (byte) (type.getValue() | 0x80);
final byte[] content = getPayload();
final int len = content.length;
if (len <= 0x7D) { // 125
array.put(opcode);
array.put((byte) len);
byte[] data = new byte[2 + len];
data[0] = opcode;
data[1] = (byte) len;
System.arraycopy(content, 0, data, 2, len);
return data;
} else if (len <= 0xFFFF) { // 65535
array.put(opcode);
array.put((byte) 0x7E); // 126
array.putChar((char) len);
byte[] data = new byte[4 + len];
data[0] = opcode;
data[1] = (byte) 0x7E; // 126
data[2] = (byte) (len >> 8 & 0xFF);
data[3] = (byte) (len & 0xFF);
System.arraycopy(content, 0, data, 4, len);
return data;
} else {
array.put(opcode);
array.put((byte) 0x7F); // 127
array.putLong(len);
byte[] data = new byte[10 + len];
data[0] = opcode;
data[1] = (byte) 0x7F; // 127
data[2] = (byte) (len >> 56 & 0xFF);
data[3] = (byte) (len >> 48 & 0xFF);
data[4] = (byte) (len >> 40 & 0xFF);
data[5] = (byte) (len >> 32 & 0xFF);
data[6] = (byte) (len >> 24 & 0xFF);
data[7] = (byte) (len >> 16 & 0xFF);
data[8] = (byte) (len >> 8 & 0xFF);
data[9] = (byte) (len & 0xFF);
System.arraycopy(content, 0, data, 10, len);
return data;
}
array.put(content);
}
public byte[] getPayload() {

View File

@@ -329,8 +329,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Traces.currentTraceid(request.getTraceid());
webSocket._readHandler = new WebSocketReadHandler(
response.getContext(), webSocket, byteArrayPool, restMessageConsumer);
webSocket._writeHandler =
new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool);
Runnable createUseridHandler = () -> {
CompletableFuture<Serializable> userFuture = webSocket.createUserid();
@@ -415,8 +413,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket.delayPackets = null;
// CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket,
// delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeHandler.send(
delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._sendToChannel(delayPackets);
cf.whenComplete((Integer v, Throwable t) -> {
Traces.currentTraceid(request.getTraceid());
if (userid == null || t != null) {
@@ -442,8 +439,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket.delayPackets = null;
// CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket,
// delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeHandler.send(
delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._sendToChannel(delayPackets);
cf.whenComplete((Integer v, Throwable t) -> {
Traces.currentTraceid(request.getTraceid());
if (sessionid == null || t != null) {

View File

@@ -1,142 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.http;
import static org.redkale.net.http.WebSocket.*;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.redkale.util.*;
/** @author zhangjx */
public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
protected final HttpContext context;
protected final WebSocket webSocket;
protected final AtomicBoolean writePending = new AtomicBoolean();
protected final ObjectPool<ByteArray> byteArrayPool;
protected final ByteArray writeArray;
protected final List<WebSocketFuture<Integer>> respList = new ArrayList();
protected final ConcurrentLinkedQueue<WebSocketFuture<Integer>> requestQueue = new ConcurrentLinkedQueue();
public WebSocketWriteHandler(HttpContext context, WebSocket webSocket, ObjectPool<ByteArray> byteArrayPool) {
this.context = context;
this.webSocket = webSocket;
this.byteArrayPool = byteArrayPool;
this.writeArray = byteArrayPool.get();
}
public CompletableFuture<Integer> send(WebSocketPacket... packets) {
WebSocketFuture<Integer> future = new WebSocketFuture<>(packets);
if (writePending.compareAndSet(false, true)) {
respList.clear();
respList.add(future);
ByteArray array = this.writeArray;
array.clear();
for (WebSocketPacket p : packets) {
writeEncode(array, p);
}
webSocket._channel.write(array, this);
} else {
requestQueue.offer(future);
}
return future;
}
@Override
public void completed(Integer result, Void attachment) {
webSocket.lastSendTime = System.currentTimeMillis();
for (WebSocketFuture<Integer> future : respList) {
future.complete(0);
}
respList.clear();
ByteArray array = this.writeArray;
array.clear();
WebSocketFuture req;
while ((req = requestQueue.poll()) != null) {
respList.add(req);
for (WebSocketPacket p : req.packets) {
writeEncode(array, p);
}
}
if (array.isEmpty()) {
if (!writePending.compareAndSet(true, false)) {
completed(0, attachment);
}
} else {
webSocket._channel.write(array, this);
}
}
@Override
public void failed(Throwable exc, Void attachment) {
writePending.set(false);
WebSocketFuture req;
try {
while ((req = requestQueue.poll()) != null) {
req.completeExceptionally(exc);
}
for (WebSocketFuture<Integer> future : respList) {
future.completeExceptionally(exc);
}
respList.clear();
} catch (Exception e) {
// do nothing
}
webSocket.kill(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler");
if (exc != null && context.getLogger().isLoggable(Level.FINER)) {
context.getLogger()
.log(
Level.FINER,
"WebSocket sendMessage on CompletionHandler failed, force to close channel, live "
+ (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds",
exc);
}
}
// 消息编码
protected void writeEncode(final ByteArray array, final WebSocketPacket packet) {
final byte opcode = (byte) (packet.type.getValue() | 0x80);
final byte[] content = packet.getPayload();
final int len = content.length;
if (len <= 0x7D) { // 125
array.put(opcode);
array.put((byte) len);
} else if (len <= 0xFFFF) { // 65535
array.put(opcode);
array.put((byte) 0x7E); // 126
array.putChar((char) len);
} else {
array.put(opcode);
array.put((byte) 0x7F); // 127
array.putLong(len);
}
array.put(content);
}
protected static class WebSocketFuture<T> extends CompletableFuture<T> {
protected WebSocketPacket[] packets;
public WebSocketFuture() {
super();
}
public WebSocketFuture(WebSocketPacket... packets) {
super();
this.packets = packets;
}
}
}