This commit is contained in:
@@ -43,7 +43,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
||||
write(srcs, 0, srcs.length, attachment, handler);
|
||||
}
|
||||
|
||||
protected abstract <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
|
||||
public abstract <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
|
||||
|
||||
public void dispose() {//同close, 只是去掉throws IOException
|
||||
try {
|
||||
@@ -187,7 +187,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||
try {
|
||||
int rs = 0;
|
||||
for (int i = offset; i < offset + length; i++) {
|
||||
@@ -338,7 +338,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||
try {
|
||||
int rs = 0;
|
||||
for (int i = offset; i < offset + length; i++) {
|
||||
|
||||
@@ -104,7 +104,8 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
|
||||
this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0);
|
||||
this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0);
|
||||
this.maxbody = config.getIntValue("maxbody", 64 * 1024);
|
||||
this.bufferCapacity = config.getIntValue("bufferCapacity", 8 * 1024);
|
||||
int bufCapacity = config.getIntValue("bufferCapacity", 8 * 1024);
|
||||
this.bufferCapacity = bufCapacity < 256 ? 256 : bufCapacity;
|
||||
this.threads = config.getIntValue("threads", Runtime.getRuntime().availableProcessors() * 16);
|
||||
this.bufferPoolSize = config.getIntValue("bufferPoolSize", Runtime.getRuntime().availableProcessors() * 512);
|
||||
this.responsePoolSize = config.getIntValue("responsePoolSize", Runtime.getRuntime().availableProcessors() * 256);
|
||||
|
||||
@@ -7,10 +7,15 @@ package org.redkale.net.http;
|
||||
|
||||
import org.redkale.util.Utility;
|
||||
import java.io.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.logging.*;
|
||||
|
||||
/**
|
||||
*
|
||||
* <p> 详情见: https://redkale.org
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public final class WebSocketPacket {
|
||||
@@ -120,4 +125,159 @@ public final class WebSocketPacket {
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : "") + "]";
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息编码
|
||||
*
|
||||
* @param supplier Supplier
|
||||
*
|
||||
* @return ByteBuffer[]
|
||||
*/
|
||||
ByteBuffer[] encode(final Supplier<ByteBuffer> supplier) {
|
||||
ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128
|
||||
|
||||
final byte opcode = (byte) (this.type.getValue() | 0x80);
|
||||
final byte[] content = getContent();
|
||||
final int len = content.length;
|
||||
if (len <= 0x7D) { //125
|
||||
buffer.put(opcode);
|
||||
buffer.put((byte) len);
|
||||
buffer.put(content);
|
||||
buffer.flip();
|
||||
return new ByteBuffer[]{buffer};
|
||||
}
|
||||
if (len <= 0xFFFF) { // 65535
|
||||
buffer.put(opcode);
|
||||
buffer.put((byte) 0x7E); //126
|
||||
buffer.putChar((char) len);
|
||||
} else {
|
||||
buffer.put(opcode);
|
||||
buffer.put((byte) 0x7F); //127
|
||||
buffer.putInt(len);
|
||||
}
|
||||
int start = buffer.remaining();
|
||||
int pend = len - buffer.remaining();
|
||||
if (pend <= 0) {
|
||||
buffer.put(content);
|
||||
buffer.flip();
|
||||
return new ByteBuffer[]{buffer};
|
||||
}
|
||||
buffer.put(content, 0, buffer.remaining());
|
||||
buffer.flip();
|
||||
final int capacity = buffer.capacity();
|
||||
final ByteBuffer[] buffers = new ByteBuffer[pend / capacity + 1];
|
||||
buffers[0] = buffer;
|
||||
for (int i = 1; i < buffers.length; i++) {
|
||||
ByteBuffer buf = supplier.get();
|
||||
buffer.put(content, start, Math.min(pend, capacity));
|
||||
buffer.flip();
|
||||
buffers[i] = buf;
|
||||
start += capacity;
|
||||
pend -= capacity;
|
||||
}
|
||||
return buffers;
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息解码 <br>
|
||||
*
|
||||
* 0 1 2 3
|
||||
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
* +-+-+-+-+-------+-+-------------+-------------------------------+
|
||||
* |F|R|R|R| opcode|M| Payload len | Extended payload length |
|
||||
* |I|S|S|S| (4) |A| (7) | (16/64) |
|
||||
* |N|V|V|V| |S| | (if payload len==126/127) |
|
||||
* | |1|2|3| |K| | |
|
||||
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|
||||
* | Extended payload length continued, if payload len == 127 |
|
||||
* + - - - - - - - - - - - - - - - +-------------------------------+
|
||||
* | |Masking-key, if MASK set to 1 |
|
||||
* +-------------------------------+-------------------------------+
|
||||
* | Masking-key (continued) | Payload Data |
|
||||
* +-------------------------------- - - - - - - - - - - - - - - - +
|
||||
* : Payload Data continued :
|
||||
* + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|
||||
* | Payload Data continued |
|
||||
* +-----------------------------------------------------------------------+
|
||||
*
|
||||
* @param buffer
|
||||
* @param exbuffers
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
WebSocketPacket decode(final Logger logger, final ByteBuffer buffer, ByteBuffer... exbuffers) {
|
||||
final boolean debug = true;
|
||||
if (debug) {
|
||||
int remain = buffer.remaining();
|
||||
if (exbuffers != null) {
|
||||
for (ByteBuffer b : exbuffers) {
|
||||
remain += b == null ? 0 : b.remaining();
|
||||
}
|
||||
}
|
||||
logger.log(Level.FINEST, "read web socket message's length = " + remain);
|
||||
}
|
||||
if (buffer.remaining() < 2) return null;
|
||||
byte opcode = buffer.get();
|
||||
this.last = (opcode & 0b1000_0000) != 0;
|
||||
this.type = FrameType.valueOf(opcode & 0xF);
|
||||
if (type == FrameType.CLOSE) {
|
||||
if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
|
||||
return null;
|
||||
}
|
||||
final boolean checkrsv = false;//暂时不校验
|
||||
if (checkrsv && (opcode & 0b0111_0000) != 0) {
|
||||
if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")");
|
||||
return null; //rsv1 rsv2 rsv3 must be 0
|
||||
}
|
||||
//0x00 表示一个后续帧
|
||||
//0x01 表示一个文本帧
|
||||
//0x02 表示一个二进制帧
|
||||
//0x03-07 为以后的非控制帧保留
|
||||
//0x8 表示一个连接关闭
|
||||
//0x9 表示一个ping
|
||||
//0xA 表示一个pong
|
||||
//0x0B-0F 为以后的控制帧保留
|
||||
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
|
||||
byte lengthCode = buffer.get();
|
||||
final boolean masked = (lengthCode & 0x80) == 0x80;
|
||||
if (masked) lengthCode ^= 0x80; //mask
|
||||
int length;
|
||||
if (lengthCode <= 0x7D) { //125
|
||||
length = lengthCode;
|
||||
} else {
|
||||
if (control) {
|
||||
if (debug) logger.log(Level.FINE, " receive control command from websocket client");
|
||||
return null;
|
||||
}
|
||||
if (lengthCode == 0x7E) {//0x7E=126
|
||||
length = (int) buffer.getChar();
|
||||
} else {
|
||||
length = buffer.getInt();
|
||||
}
|
||||
}
|
||||
byte[] mask = null;
|
||||
if (masked) {
|
||||
mask = new byte[4];
|
||||
buffer.get(mask);
|
||||
}
|
||||
final byte[] data = new byte[length];
|
||||
if (buffer.remaining() >= length) {
|
||||
buffer.get(data);
|
||||
} else { //必须有 exbuffers
|
||||
int offset = buffer.remaining();
|
||||
buffer.get(data, 0, offset);
|
||||
for (ByteBuffer b : exbuffers) {
|
||||
offset += b.remaining();
|
||||
b.get(data, offset, b.remaining());
|
||||
}
|
||||
}
|
||||
if (mask != null) {
|
||||
for (int i = 0; i < data.length; i++) {
|
||||
data[i] ^= mask[i % 4];
|
||||
}
|
||||
}
|
||||
this.bytes = data;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -37,14 +37,12 @@ public class WebSocketRunner implements Runnable {
|
||||
|
||||
private ByteBuffer readBuffer;
|
||||
|
||||
private ByteBuffer writeBuffer;
|
||||
private ByteBuffer[] writeBuffers;
|
||||
|
||||
protected volatile boolean closed = false;
|
||||
|
||||
private AtomicBoolean writing = new AtomicBoolean();
|
||||
|
||||
private final Coder coder = new Coder();
|
||||
|
||||
private final BlockingQueue<QueueEntry> queue = new ArrayBlockingQueue(1024);
|
||||
|
||||
private final boolean wsbinary;
|
||||
@@ -57,15 +55,12 @@ public class WebSocketRunner implements Runnable {
|
||||
this.webSocket = webSocket;
|
||||
this.channel = channel;
|
||||
this.wsbinary = wsbinary;
|
||||
this.coder.logger = context.getLogger();
|
||||
this.coder.debugable = false;//context.getLogger().isLoggable(Level.FINEST);
|
||||
this.readBuffer = context.pollBuffer();
|
||||
this.writeBuffer = context.pollBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
final boolean debug = this.coder.debugable;
|
||||
final boolean debug = true;
|
||||
try {
|
||||
webSocket.onConnected();
|
||||
channel.setReadTimeoutSecond(300); //读取超时5分钟
|
||||
@@ -107,10 +102,14 @@ public class WebSocketRunner implements Runnable {
|
||||
b.flip();
|
||||
}
|
||||
}
|
||||
WebSocketPacket packet = coder.decode(readBuffer, exBuffers);
|
||||
if (exBuffers != null) {
|
||||
for (ByteBuffer b : exBuffers) {
|
||||
context.offerBuffer(b);
|
||||
WebSocketPacket packet = null;
|
||||
try {
|
||||
packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
|
||||
} finally {
|
||||
if (exBuffers != null) {
|
||||
for (ByteBuffer b : exBuffers) {
|
||||
context.offerBuffer(b);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (packet == null) {
|
||||
@@ -163,73 +162,66 @@ public class WebSocketRunner implements Runnable {
|
||||
public CompletableFuture<Integer> sendMessage(WebSocketPacket packet) {
|
||||
if (packet == null) return CompletableFuture.completedFuture(RETCODE_SEND_ILLPACKET);
|
||||
if (closed) return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED);
|
||||
final boolean debug = this.coder.debugable;
|
||||
boolean debug = true;
|
||||
//System.out.println("推送消息");
|
||||
final byte[] bytes = coder.encode(packet);
|
||||
if (debug) context.getLogger().log(Level.FINEST, "send web socket message's length = " + bytes.length);
|
||||
if (debug) context.getLogger().log(Level.FINEST, "send web socket message: " + packet);
|
||||
this.lastSendTime = System.currentTimeMillis();
|
||||
final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
|
||||
if (writing.getAndSet(true)) {
|
||||
queue.add(new QueueEntry(futureResult, bytes));
|
||||
queue.add(new QueueEntry(futureResult, packet));
|
||||
return futureResult;
|
||||
}
|
||||
ByteBuffer localWriteBuffer = writeBuffer;
|
||||
if (localWriteBuffer == null) return CompletableFuture.completedFuture(RETCODE_ILLEGALBUFFER);
|
||||
ByteBuffer sendBuffer;
|
||||
if (bytes.length <= localWriteBuffer.capacity()) {
|
||||
localWriteBuffer.clear();
|
||||
localWriteBuffer.put(bytes);
|
||||
localWriteBuffer.flip();
|
||||
sendBuffer = localWriteBuffer;
|
||||
} else {
|
||||
sendBuffer = ByteBuffer.wrap(bytes);
|
||||
}
|
||||
ByteBuffer[] buffers = packet.encode(this.context.getBufferSupplier());
|
||||
this.writeBuffers = buffers;
|
||||
try {
|
||||
channel.write(sendBuffer, sendBuffer, new CompletionHandler<Integer, ByteBuffer>() {
|
||||
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
|
||||
|
||||
private CompletableFuture<Integer> future = futureResult;
|
||||
|
||||
@Override
|
||||
public void completed(Integer result, ByteBuffer attachment) {
|
||||
if (attachment == null || closed) {
|
||||
public void completed(Integer result, ByteBuffer[] attachments) {
|
||||
if (attachments == null || closed) {
|
||||
if (future != null) {
|
||||
future.complete(RETCODE_WSOCKET_CLOSED);
|
||||
future = null;
|
||||
if (writeBuffers != null) {
|
||||
for (ByteBuffer buf : writeBuffers) {
|
||||
context.offerBuffer(buf);
|
||||
}
|
||||
writeBuffers = null;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (attachment.hasRemaining()) {
|
||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner write completed reemaining: " + attachment.remaining());
|
||||
channel.write(attachment, attachment, this);
|
||||
int index = -1;
|
||||
for (int i = 0; i < attachments.length; i++) {
|
||||
if (attachments[i].hasRemaining()) {
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (index >= 0) {
|
||||
channel.write(attachments, index, attachments.length - index, attachments, this);
|
||||
return;
|
||||
}
|
||||
if (future != null) {
|
||||
future.complete(0);
|
||||
future = null;
|
||||
if (writeBuffers != null) {
|
||||
for (ByteBuffer buf : writeBuffers) {
|
||||
context.offerBuffer(buf);
|
||||
}
|
||||
writeBuffers = null;
|
||||
}
|
||||
}
|
||||
QueueEntry entry = queue.poll();
|
||||
ByteBuffer localWriteBuffer = writeBuffer;
|
||||
if (entry == null) return; //没有数据了
|
||||
future = entry.future;
|
||||
if (localWriteBuffer == null) {
|
||||
if (future != null) {
|
||||
future.complete(RETCODE_WSOCKET_CLOSED);
|
||||
future = null;
|
||||
}
|
||||
return;
|
||||
if (entry != null) {
|
||||
future = entry.future;
|
||||
ByteBuffer[] buffers = packet.encode(context.getBufferSupplier());
|
||||
writeBuffers = buffers;
|
||||
channel.write(buffers, buffers, this);
|
||||
}
|
||||
byte[] bs = entry.bytes;
|
||||
ByteBuffer sendBuffer;
|
||||
if (bs.length <= localWriteBuffer.capacity()) {
|
||||
localWriteBuffer.clear();
|
||||
localWriteBuffer.put(bs);
|
||||
localWriteBuffer.flip();
|
||||
sendBuffer = localWriteBuffer;
|
||||
} else {
|
||||
sendBuffer = ByteBuffer.wrap(bs);
|
||||
}
|
||||
channel.write(sendBuffer, sendBuffer, this);
|
||||
} catch (Exception e) {
|
||||
closeRunner();
|
||||
context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
|
||||
@@ -238,7 +230,7 @@ public class WebSocketRunner implements Runnable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable exc, ByteBuffer attachment) {
|
||||
public void failed(Throwable exc, ByteBuffer[] attachments) {
|
||||
writing.set(false);
|
||||
closeRunner();
|
||||
if (exc != null) {
|
||||
@@ -265,9 +257,13 @@ public class WebSocketRunner implements Runnable {
|
||||
} catch (Throwable t) {
|
||||
}
|
||||
context.offerBuffer(readBuffer);
|
||||
context.offerBuffer(writeBuffer);
|
||||
readBuffer = null;
|
||||
writeBuffer = null;
|
||||
if (writeBuffers != null) {
|
||||
for (ByteBuffer buf : writeBuffers) {
|
||||
context.offerBuffer(buf);
|
||||
}
|
||||
writeBuffers = null;
|
||||
}
|
||||
engine.remove(webSocket);
|
||||
webSocket.onClose(0, null);
|
||||
}
|
||||
@@ -277,11 +273,11 @@ public class WebSocketRunner implements Runnable {
|
||||
|
||||
public final CompletableFuture<Integer> future;
|
||||
|
||||
public final byte[] bytes;
|
||||
public final WebSocketPacket packet;
|
||||
|
||||
public QueueEntry(CompletableFuture<Integer> future, byte[] bytes) {
|
||||
public QueueEntry(CompletableFuture<Integer> future, WebSocketPacket packet) {
|
||||
this.future = future;
|
||||
this.bytes = bytes;
|
||||
this.packet = packet;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -389,7 +385,7 @@ public class WebSocketRunner implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
private static final class Coder {
|
||||
private static final class WebSocketCoder {
|
||||
|
||||
protected byte inFragmentedType;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user