This commit is contained in:
Redkale
2021-01-13 09:22:55 +08:00
parent f3668a77ef
commit e45b945024

View File

@@ -290,12 +290,13 @@ class WebSocketRunner implements Runnable {
} }
}; };
this.lastSendTime = System.currentTimeMillis(); this.lastSendTime = System.currentTimeMillis();
synchronized (this) {
if (writeSemaphore.tryAcquire()) { if (writeSemaphore.tryAcquire()) {
webSocket._channel.write(buffers, buffers, handler); webSocket._channel.write(buffers, buffers, handler);
} else { } else {
writeQueue.add(new WriteEntry(buffers, handler)); writeQueue.add(new WriteEntry(buffers, handler));
} }
}
} catch (Exception t) { } catch (Exception t) {
futureResult.complete(RETCODE_SENDEXCEPTION); futureResult.complete(RETCODE_SENDEXCEPTION);
closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on channel.write"); closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on channel.write");
@@ -305,13 +306,19 @@ class WebSocketRunner implements Runnable {
} }
int ts = webSocket._channel.getWriteTimeoutSeconds(); int ts = webSocket._channel.getWriteTimeoutSeconds();
return futureResult.orTimeout(ts > 0 ? ts : 6, TimeUnit.SECONDS).whenComplete((r, t) -> { return futureResult.orTimeout(ts > 0 ? ts : 6, TimeUnit.SECONDS).whenComplete((r, t) -> {
nextWrite();
});
}
private void nextWrite() {
synchronized (this) {
WriteEntry entry = writeQueue.poll(); WriteEntry entry = writeQueue.poll();
if (entry != null) { if (entry != null) {
webSocket._channel.write(entry.writeBuffers, entry.writeBuffers, entry.writeHandler); webSocket._channel.write(entry.writeBuffers, entry.writeBuffers, entry.writeHandler);
} else { } else {
writeSemaphore.release(); writeSemaphore.release();
} }
}); }
} }
public boolean isClosed() { public boolean isClosed() {