This commit is contained in:
Redkale
2018-05-31 10:23:16 +08:00
parent 3d11a95a29
commit 699d55d70e

View File

@@ -41,7 +41,7 @@ class WebSocketRunner implements Runnable {
private final AtomicBoolean writing = new AtomicBoolean(); private final AtomicBoolean writing = new AtomicBoolean();
private final BlockingQueue<QueueEntry> queue = new ArrayBlockingQueue(256); private final BlockingQueue<QueueEntry> writeQueue = new ArrayBlockingQueue(512);
private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用 private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用
@@ -83,13 +83,15 @@ class WebSocketRunner implements Runnable {
closeRunner(0, "read buffer count is " + count); closeRunner(0, "read buffer count is " + count);
return; return;
} }
if (readBuffer == null) return; try {
ByteBuffer readBuf = readBuffer;
if (readBuf == null) return; //关闭后readBuffer为null
lastReadTime = System.currentTimeMillis(); lastReadTime = System.currentTimeMillis();
readBuffer.flip(); readBuf.flip();
WebSocketPacket onePacket = null; WebSocketPacket onePacket = null;
if (unfinishPacket != null) { if (unfinishPacket != null) {
if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕 if (unfinishPacket.receiveBody(webSocket, readBuf)) { //已经接收完毕
onePacket = unfinishPacket; onePacket = unfinishPacket;
unfinishPacket = null; unfinishPacket = null;
for (ByteBuffer b : exBuffers) { for (ByteBuffer b : exBuffers) {
@@ -97,8 +99,9 @@ class WebSocketRunner implements Runnable {
} }
exBuffers.clear(); exBuffers.clear();
} else { //需要继续接收 } else { //需要继续接收
readBuffer = context.pollBuffer(); readBuf = context.pollBuffer();
channel.read(readBuffer, null, this); readBuffer = readBuf;
channel.read(readBuf, null, this);
return; return;
} }
} }
@@ -107,30 +110,31 @@ class WebSocketRunner implements Runnable {
if (onePacket != null) packets.add(onePacket); if (onePacket != null) packets.add(onePacket);
try { try {
while (true) { while (true) {
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer); WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuf);
if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节
if (packet != null && !packet.isReceiveFinished()) { if (packet != null && !packet.isReceiveFinished()) {
unfinishPacket = packet; unfinishPacket = packet;
if (readBuffer.hasRemaining()) { if (readBuf.hasRemaining()) {
exBuffers.add(readBuffer); exBuffers.add(readBuf);
readBuffer = context.pollBuffer(); readBuf = context.pollBuffer();
readBuffer = readBuf;
} }
break; break;
} }
packets.add(packet); packets.add(packet);
if (packet == null || !readBuffer.hasRemaining()) break; if (packet == null || !readBuf.hasRemaining()) break;
} }
} catch (Exception e) { } catch (Exception e) {
context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e); context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e);
webSocket.onOccurException(e, null); webSocket.onOccurException(e, null);
} }
//继续监听消息 //继续监听消息
readBuffer.clear(); readBuf.clear();
if (halfBytes.getValue() != null) { if (halfBytes.getValue() != null) {
readBuffer.put(halfBytes.getValue()); readBuf.put(halfBytes.getValue());
halfBytes.setValue(null); halfBytes.setValue(null);
} }
channel.read(readBuffer, null, this); channel.read(readBuf, null, this);
//消息处理 //消息处理
for (final WebSocketPacket packet : packets) { for (final WebSocketPacket packet : packets) {
@@ -191,6 +195,10 @@ class WebSocketRunner implements Runnable {
return; return;
} }
} }
} catch (Exception e) {
context.getLogger().log(Level.WARNING, "WebSocketRunner(userid=" + webSocket.getUserid() + ") onMessage by received error", e);
closeRunner(0, "websocket-received error");
}
} }
@Override @Override
@@ -222,7 +230,7 @@ class WebSocketRunner implements Runnable {
try { try {
synchronized (writing) { synchronized (writing) {
if (writing.getAndSet(true)) { if (writing.getAndSet(true)) {
queue.add(new QueueEntry(futureResult, packet)); writeQueue.add(new QueueEntry(futureResult, packet));
return futureResult; return futureResult;
} }
} }
@@ -271,7 +279,7 @@ class WebSocketRunner implements Runnable {
} }
QueueEntry entry = null; QueueEntry entry = null;
synchronized (writing) { synchronized (writing) {
entry = queue.poll(); entry = writeQueue.poll();
if (entry == null) writing.set(false); if (entry == null) writing.set(false);
} }
if (entry != null) { if (entry != null) {
@@ -282,8 +290,9 @@ class WebSocketRunner implements Runnable {
channel.write(buffers, buffers, this); channel.write(buffers, buffers, this);
} }
} catch (Exception e) { } catch (Exception e) {
context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e); futureResult.complete(RETCODE_SENDEXCEPTION);
closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on rewrite"); closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on rewrite");
context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
} }
} }
@@ -317,14 +326,16 @@ class WebSocketRunner implements Runnable {
synchronized (this) { synchronized (this) {
if (closed) return; if (closed) return;
closed = true; closed = true;
try { channel.dispose();
channel.close();
} catch (Throwable t) {
}
context.offerBuffer(readBuffer); context.offerBuffer(readBuffer);
readBuffer = null; readBuffer = null;
engine.remove(webSocket); engine.remove(webSocket);
webSocket.onClose(code, reason); webSocket.onClose(code, reason);
QueueEntry entry = writeQueue.poll();
while (entry != null) {
entry.future.complete(RETCODE_WSOCKET_CLOSED);
entry = writeQueue.poll();
}
} }
} }