WebSocketRunner加入写队列

This commit is contained in:
Redkale
2020-01-17 14:39:19 +08:00
parent 3b142b7504
commit 5763718816

View File

@@ -34,6 +34,10 @@ class WebSocketRunner implements Runnable {
protected final boolean mergemsg;
protected final Semaphore writeSemaphore = new Semaphore(1);
protected final LinkedBlockingQueue<WriteEntry> writeQueue = new LinkedBlockingQueue();
volatile boolean closed = false;
FrameType currSeriesMergeFrameType;
@@ -228,9 +232,7 @@ class WebSocketRunner implements Runnable {
try {
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor);
//if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet);
this.lastSendTime = System.currentTimeMillis();
webSocket._channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
CompletionHandler<Integer, ByteBuffer[]> handler = new CompletionHandler<Integer, ByteBuffer[]>() {
private CompletableFuture<Integer> future = futureResult;
@@ -285,7 +287,14 @@ class WebSocketRunner implements Runnable {
}
}
});
};
this.lastSendTime = System.currentTimeMillis();
if (writeSemaphore.tryAcquire()) {
webSocket._channel.write(buffers, buffers, handler);
} else {
writeQueue.add(new WriteEntry(buffers, handler));
}
} catch (Exception t) {
futureResult.complete(RETCODE_SENDEXCEPTION);
closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on channel.write");
@@ -294,7 +303,14 @@ class WebSocketRunner implements Runnable {
}
}
return futureResult;
return futureResult.whenComplete((r, t) -> {
WriteEntry entry = writeQueue.poll();
if (entry != null) {
webSocket._channel.write(entry.writeBuffers, entry.writeBuffers, entry.writeHandler);
} else {
writeSemaphore.release();
}
});
}
public boolean isClosed() {
@@ -314,4 +330,16 @@ class WebSocketRunner implements Runnable {
}
}
private static class WriteEntry {
ByteBuffer[] writeBuffers;
CompletionHandler writeHandler;
public WriteEntry(ByteBuffer[] writeBuffers, CompletionHandler writeHandler) {
this.writeBuffers = writeBuffers;
this.writeHandler = writeHandler;
}
}
}