This commit is contained in:
Redkale
2018-01-16 14:24:00 +08:00
parent d5365af373
commit f713669d1f

View File

@@ -40,7 +40,7 @@ class WebSocketRunner implements Runnable {
volatile boolean closed = false; volatile boolean closed = false;
private AtomicBoolean writing = new AtomicBoolean(); private final AtomicBoolean writing = new AtomicBoolean();
private final BlockingQueue<QueueEntry> queue = new ArrayBlockingQueue(1024); private final BlockingQueue<QueueEntry> queue = new ArrayBlockingQueue(1024);
@@ -213,20 +213,17 @@ class WebSocketRunner implements Runnable {
public CompletableFuture<Integer> sendMessage(WebSocketPacket packet) { public CompletableFuture<Integer> sendMessage(WebSocketPacket packet) {
if (packet == null) return CompletableFuture.completedFuture(RETCODE_SEND_ILLPACKET); if (packet == null) return CompletableFuture.completedFuture(RETCODE_SEND_ILLPACKET);
if (closed) return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED); if (closed) return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED);
boolean debug = true; boolean debug = context.getLogger().isLoggable(Level.FINEST);
//System.out.println("推送消息"); //System.out.println("推送消息");
//if (debug) context.getLogger().log(Level.FINEST, "send web socket message: " + packet);
final CompletableFuture<Integer> futureResult = new CompletableFuture<>(); final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
synchronized (writing) {
if (writing.getAndSet(true)) { if (writing.getAndSet(true)) {
QueueEntry qe = new QueueEntry(futureResult, packet); queue.add(new QueueEntry(futureResult, packet));
queue.add(qe);
if (!writing.get()) { //防止刚好CompletionHandler进程在poll之后获取null正准备writing.set(false)时进入queue.add(qe)。导致本消息不能发送
queue.remove(qe);
return sendMessage(packet);
}
return futureResult; return futureResult;
} }
}
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier()); ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier());
if (debug) context.getLogger().log(Level.FINEST, "sending websocket message: " + packet);
try { try {
this.lastSendTime = System.currentTimeMillis(); this.lastSendTime = System.currentTimeMillis();
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() { channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
@@ -268,25 +265,18 @@ class WebSocketRunner implements Runnable {
} }
} }
} }
QueueEntry entry = queue.poll(); QueueEntry entry = null;
if (entry != null) { synchronized (writing) {
future = entry.future;
ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier());
lastSendTime = System.currentTimeMillis();
channel.write(buffers, buffers, this);
} else {
writing.set(false);
if (writing.get()) { //刚好进入writing.getAndSet(true)后
entry = queue.poll(); entry = queue.poll();
if (entry == null) writing.set(false);
}
if (entry != null) { if (entry != null) {
writing.set(true);
future = entry.future; future = entry.future;
ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier()); ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier());
lastSendTime = System.currentTimeMillis(); lastSendTime = System.currentTimeMillis();
if (debug) context.getLogger().log(Level.FINEST, "sending websocket message: " + entry.packet);
channel.write(buffers, buffers, this); channel.write(buffers, buffers, this);
} }
}
}
} catch (Exception e) { } catch (Exception e) {
context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e); context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on rewrite"); closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on rewrite");