diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 0e5e92d11..c7c02ca26 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -189,10 +189,23 @@ public abstract class WebSocket { if (message == null || message instanceof CharSequence || message instanceof byte[]) { return send(new WebSocketPacket((Serializable) message, last)); } else { - return send(new WebSocketPacket(_jsonConvert.convertTo(message), last)); + return send(new WebSocketPacket(_jsonConvert, message, last)); } } + /** + * 给自身发送消息, 消息类型是JSON对象 + * + * @param convert JsonConvert + * @param message 不可为空, 只能是String或byte[]或可JSON化对象 + * @param last 是否最后一条 + * + * @return 0表示成功, 非0表示错误码 + */ + public final CompletableFuture send(JsonConvert convert, Object message, boolean last) { + return send(new WebSocketPacket(convert == null ? _jsonConvert : convert, message, last)); + } + //---------------------------------------------------------------- /** * 给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息 diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index ef51371b6..c602973aa 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -10,6 +10,7 @@ import java.io.*; import java.nio.ByteBuffer; import java.util.function.Supplier; import java.util.logging.*; +import org.redkale.convert.json.JsonConvert; /** * @@ -56,6 +57,10 @@ public final class WebSocketPacket { protected boolean last = true; + protected Object json; + + JsonConvert convert; + public WebSocketPacket() { } @@ -81,6 +86,13 @@ public final class WebSocketPacket { this.last = fin; } + public WebSocketPacket(JsonConvert convert, Object json, boolean fin) { + this.type = FrameType.TEXT; + this.convert = convert; + this.json = json; + this.last = fin; + } + public WebSocketPacket(byte[] data) { this(FrameType.BINARY, data, true); } @@ -134,9 +146,48 @@ public final class WebSocketPacket { * @return ByteBuffer[] */ ByteBuffer[] encode(final Supplier supplier) { - ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128 - final byte opcode = (byte) (this.type.getValue() | 0x80); + if (this.convert != null) { + Supplier newsupplier = new Supplier() { + + private ByteBuffer buf = supplier.get(); + + @Override + public ByteBuffer get() { + if (buf != null) { + ByteBuffer rs = buf; + rs.position(6); + this.buf = null; + return rs; + } + return supplier.get(); + } + }; + ByteBuffer[] buffers = this.convert.convertTo(newsupplier, json); + int len = 0; + for (ByteBuffer buf : buffers) { + len += buf.remaining(); + } + int contentLength = len - 6; + ByteBuffer firstbuf = buffers[0]; + if (contentLength <= 0x7D) { //125 + firstbuf.put(4, opcode); + firstbuf.put(5, (byte) contentLength); + firstbuf.position(4); + } else if (contentLength <= 0xFFFF) { + firstbuf.put(2, opcode); + firstbuf.put(3, (byte) 0x7E); //126 + firstbuf.putChar(4, (char) contentLength); + firstbuf.position(2); + } else { + firstbuf.put(0, opcode); + firstbuf.put(1, (byte) 0x7F); //127 + firstbuf.putInt(2, contentLength); + } + return buffers; + } + + ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128 final byte[] content = getContent(); final int len = content.length; if (len <= 0x7D) { //125