This commit is contained in:
Redkale
2017-05-21 19:26:27 +08:00
parent cdec316312
commit 62b0be802e
2 changed files with 67 additions and 3 deletions

View File

@@ -189,10 +189,23 @@ public abstract class WebSocket {
if (message == null || message instanceof CharSequence || message instanceof byte[]) {
return send(new WebSocketPacket((Serializable) message, last));
} else {
return send(new WebSocketPacket(_jsonConvert.convertTo(message), last));
return send(new WebSocketPacket(_jsonConvert, message, last));
}
}
/**
* 给自身发送消息, 消息类型是JSON对象
*
* @param convert JsonConvert
* @param message 不可为空, 只能是String或byte[]或可JSON化对象
* @param last 是否最后一条
*
* @return 0表示成功 非0表示错误码
*/
public final CompletableFuture<Integer> send(JsonConvert convert, Object message, boolean last) {
return send(new WebSocketPacket(convert == null ? _jsonConvert : convert, message, last));
}
//----------------------------------------------------------------
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息

View File

@@ -10,6 +10,7 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
import java.util.logging.*;
import org.redkale.convert.json.JsonConvert;
/**
*
@@ -56,6 +57,10 @@ public final class WebSocketPacket {
protected boolean last = true;
protected Object json;
JsonConvert convert;
public WebSocketPacket() {
}
@@ -81,6 +86,13 @@ public final class WebSocketPacket {
this.last = fin;
}
public WebSocketPacket(JsonConvert convert, Object json, boolean fin) {
this.type = FrameType.TEXT;
this.convert = convert;
this.json = json;
this.last = fin;
}
public WebSocketPacket(byte[] data) {
this(FrameType.BINARY, data, true);
}
@@ -134,9 +146,48 @@ public final class WebSocketPacket {
* @return ByteBuffer[]
*/
ByteBuffer[] encode(final Supplier<ByteBuffer> supplier) {
ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128
final byte opcode = (byte) (this.type.getValue() | 0x80);
if (this.convert != null) {
Supplier<ByteBuffer> newsupplier = new Supplier<ByteBuffer>() {
private ByteBuffer buf = supplier.get();
@Override
public ByteBuffer get() {
if (buf != null) {
ByteBuffer rs = buf;
rs.position(6);
this.buf = null;
return rs;
}
return supplier.get();
}
};
ByteBuffer[] buffers = this.convert.convertTo(newsupplier, json);
int len = 0;
for (ByteBuffer buf : buffers) {
len += buf.remaining();
}
int contentLength = len - 6;
ByteBuffer firstbuf = buffers[0];
if (contentLength <= 0x7D) { //125
firstbuf.put(4, opcode);
firstbuf.put(5, (byte) contentLength);
firstbuf.position(4);
} else if (contentLength <= 0xFFFF) {
firstbuf.put(2, opcode);
firstbuf.put(3, (byte) 0x7E); //126
firstbuf.putChar(4, (char) contentLength);
firstbuf.position(2);
} else {
firstbuf.put(0, opcode);
firstbuf.put(1, (byte) 0x7F); //127
firstbuf.putInt(2, contentLength);
}
return buffers;
}
ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128
final byte[] content = getContent();
final int len = content.length;
if (len <= 0x7D) { //125