This commit is contained in:
Redkale
2020-04-16 21:08:00 +08:00
parent 63ef83ec62
commit f073c9a0bc
3 changed files with 15 additions and 21 deletions

View File

@@ -244,7 +244,7 @@ public class WebSocketEngine {
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor));
packet.encodePacket(bufferSupplier, bufferConsumer, cryptor);
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
@@ -255,7 +255,7 @@ public class WebSocketEngine {
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor));
packet.encodePacket(bufferSupplier, bufferConsumer, cryptor);
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
@@ -312,7 +312,7 @@ public class WebSocketEngine {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
//packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
//packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor);
CompletableFuture<Integer> future = null;
if (single) {
for (Serializable userid : userids) {
@@ -321,7 +321,7 @@ public class WebSocketEngine {
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor));
packet.encodePacket(bufferSupplier, bufferConsumer, cryptor);
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
@@ -333,7 +333,7 @@ public class WebSocketEngine {
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor));
packet.encodePacket(bufferSupplier, bufferConsumer, cryptor);
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}

View File

@@ -153,16 +153,6 @@ public final class WebSocketPacket {
if (mapconvable && !(json instanceof Object[])) throw new IllegalArgumentException();
}
WebSocketPacket(ByteBuffer[] sendBuffers, FrameType type, boolean fin) {
this.type = type;
this.last = fin;
this.setSendBuffers(sendBuffers);
}
void setSendBuffers(ByteBuffer[] sendBuffers) {
this.sendBuffers = sendBuffers;
}
ByteBuffer[] duplicateSendBuffers() {
ByteBuffer[] rs = new ByteBuffer[this.sendBuffers.length];
for (int i = 0; i < this.sendBuffers.length; i++) {
@@ -228,7 +218,7 @@ public final class WebSocketPacket {
*
* @return ByteBuffer[]
*/
ByteBuffer[] encodePacket(final Supplier<ByteBuffer> supplier, final Consumer<ByteBuffer> consumer, final Cryptor cryptor) {
void encodePacket(final Supplier<ByteBuffer> supplier, final Consumer<ByteBuffer> consumer, final Cryptor cryptor) {
final byte opcode = (byte) (this.type.getValue() | 0x80);
if (this.sendConvert != null) {
Supplier<ByteBuffer> newsupplier = new Supplier<ByteBuffer>() {
@@ -268,7 +258,8 @@ public final class WebSocketPacket {
firstbuf.put(1, (byte) 0x7F); //127
firstbuf.putInt(2, contentLength);
}
return buffers;
this.sendBuffers = buffers;
return;
}
ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128
@@ -299,7 +290,8 @@ public final class WebSocketPacket {
buffer.put((byte) len);
buffer.put(content);
buffer.flip();
return new ByteBuffer[]{buffer};
this.sendBuffers = new ByteBuffer[]{buffer};
return;
}
if (len <= 0xFFFF) { // 65535
buffer.put(opcode);
@@ -315,7 +307,8 @@ public final class WebSocketPacket {
if (pend <= 0) {
buffer.put(content);
buffer.flip();
return new ByteBuffer[]{buffer};
this.sendBuffers = new ByteBuffer[]{buffer};
return;
}
buffer.put(content, 0, buffer.remaining());
buffer.flip();
@@ -330,7 +323,7 @@ public final class WebSocketPacket {
start += capacity;
pend -= capacity;
}
return buffers;
this.sendBuffers = buffers;
}
// public static void main(String[] args) throws Throwable {

View File

@@ -230,7 +230,8 @@ class WebSocketRunner implements Runnable {
//System.out.println("推送消息");
final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
try {
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encodePacket(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor);
if (packet.sendBuffers == null) packet.encodePacket(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor);
ByteBuffer[] buffers = packet.duplicateSendBuffers();
//if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet);
CompletionHandler<Integer, ByteBuffer[]> handler = new CompletionHandler<Integer, ByteBuffer[]>() {