This commit is contained in:
Redkale
2017-05-20 13:40:58 +08:00
parent d5e44787c0
commit 46ccd83acc

View File

@@ -39,16 +39,16 @@ public class WebSocketRunner implements Runnable {
private ByteBuffer writeBuffer;
protected boolean closed = false;
protected volatile boolean closed = false;
private AtomicBoolean writing = new AtomicBoolean();
private final Coder coder = new Coder();
private final BlockingQueue<byte[]> queue = new ArrayBlockingQueue(1024);
private final BlockingQueue<QueueEntry> queue = new ArrayBlockingQueue(1024);
private final boolean wsbinary;
protected long lastSendTime;
public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel, final boolean wsbinary) {
@@ -168,23 +168,27 @@ public class WebSocketRunner implements Runnable {
final byte[] bytes = coder.encode(packet);
if (debug) context.getLogger().log(Level.FINEST, "send web socket message's length = " + bytes.length);
this.lastSendTime = System.currentTimeMillis();
final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
if (writing.getAndSet(true)) {
queue.add(bytes);
return CompletableFuture.completedFuture(0);
queue.add(new QueueEntry(futureResult, bytes));
return futureResult;
}
if (writeBuffer == null) return CompletableFuture.completedFuture(RETCODE_ILLEGALBUFFER);
ByteBuffer localWriteBuffer = writeBuffer;
if (localWriteBuffer == null) return CompletableFuture.completedFuture(RETCODE_ILLEGALBUFFER);
ByteBuffer sendBuffer;
if (bytes.length <= writeBuffer.capacity()) {
writeBuffer.clear();
writeBuffer.put(bytes);
writeBuffer.flip();
sendBuffer = writeBuffer;
if (bytes.length <= localWriteBuffer.capacity()) {
localWriteBuffer.clear();
localWriteBuffer.put(bytes);
localWriteBuffer.flip();
sendBuffer = localWriteBuffer;
} else {
sendBuffer = ByteBuffer.wrap(bytes);
}
try {
channel.write(sendBuffer, sendBuffer, new CompletionHandler<Integer, ByteBuffer>() {
private CompletableFuture<Integer> future = futureResult;
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment == null || closed) return;
@@ -194,21 +198,26 @@ public class WebSocketRunner implements Runnable {
channel.write(attachment, attachment, this);
return;
}
byte[] bs = queue.poll();
if (bs != null && writeBuffer != null) {
ByteBuffer sendBuffer;
if (bs.length <= writeBuffer.capacity()) {
writeBuffer.clear();
writeBuffer.put(bs);
writeBuffer.flip();
sendBuffer = writeBuffer;
} else {
sendBuffer = ByteBuffer.wrap(bs);
}
channel.write(sendBuffer, sendBuffer, this);
return;
if (future != null) {
future.complete(0);
future = null;
}
} catch (NullPointerException e) {
QueueEntry entry = queue.poll();
ByteBuffer localWriteBuffer = writeBuffer;
if (entry == null || localWriteBuffer == null) return; //没有数据了
future = entry.future;
byte[] bs = entry.bytes;
ByteBuffer sendBuffer;
if (bs.length <= localWriteBuffer.capacity()) {
localWriteBuffer.clear();
localWriteBuffer.put(bs);
localWriteBuffer.flip();
sendBuffer = localWriteBuffer;
} else {
sendBuffer = ByteBuffer.wrap(bs);
}
channel.write(sendBuffer, sendBuffer, this);
} catch (Exception e) {
closeRunner();
context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
@@ -225,13 +234,13 @@ public class WebSocketRunner implements Runnable {
}
}
});
return CompletableFuture.completedFuture(0);
} catch (Exception t) {
writing.set(false);
closeRunner();
context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
return CompletableFuture.completedFuture(RETCODE_SENDEXCEPTION);
futureResult.complete(RETCODE_SENDEXCEPTION);
}
return futureResult;
}
public void closeRunner() {
@@ -252,6 +261,19 @@ public class WebSocketRunner implements Runnable {
}
}
private static final class QueueEntry {
public final CompletableFuture<Integer> future;
public final byte[] bytes;
public QueueEntry(CompletableFuture<Integer> future, byte[] bytes) {
this.future = future;
this.bytes = bytes;
}
}
private static final class Masker {
public static final int MASK_SIZE = 4;