diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 6a2d1ccb9..bbf3498e0 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -41,7 +41,7 @@ class WebSocketRunner implements Runnable { private final AtomicBoolean writing = new AtomicBoolean(); - private final BlockingQueue queue = new ArrayBlockingQueue(256); + private final BlockingQueue writeQueue = new ArrayBlockingQueue(512); private final BiConsumer restMessageConsumer; //主要供RestWebSocket使用 @@ -83,113 +83,121 @@ class WebSocketRunner implements Runnable { closeRunner(0, "read buffer count is " + count); return; } - if (readBuffer == null) return; - lastReadTime = System.currentTimeMillis(); - readBuffer.flip(); - - WebSocketPacket onePacket = null; - if (unfinishPacket != null) { - if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕 - onePacket = unfinishPacket; - unfinishPacket = null; - for (ByteBuffer b : exBuffers) { - context.offerBuffer(b); - } - exBuffers.clear(); - } else { //需要继续接收 - readBuffer = context.pollBuffer(); - channel.read(readBuffer, null, this); - return; - } - } - - final List packets = new ArrayList<>(); - if (onePacket != null) packets.add(onePacket); try { - while (true) { - WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer); - if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 - if (packet != null && !packet.isReceiveFinished()) { - unfinishPacket = packet; - if (readBuffer.hasRemaining()) { - exBuffers.add(readBuffer); - readBuffer = context.pollBuffer(); + ByteBuffer readBuf = readBuffer; + if (readBuf == null) return; //关闭后readBuffer为null + lastReadTime = System.currentTimeMillis(); + readBuf.flip(); + + WebSocketPacket onePacket = null; + if (unfinishPacket != null) { + if (unfinishPacket.receiveBody(webSocket, readBuf)) { //已经接收完毕 + onePacket = unfinishPacket; + unfinishPacket = null; + for (ByteBuffer b : exBuffers) { + context.offerBuffer(b); } - break; + exBuffers.clear(); + } else { //需要继续接收 + readBuf = context.pollBuffer(); + readBuffer = readBuf; + channel.read(readBuf, null, this); + return; + } + } + + final List packets = new ArrayList<>(); + if (onePacket != null) packets.add(onePacket); + try { + while (true) { + WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuf); + if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 + if (packet != null && !packet.isReceiveFinished()) { + unfinishPacket = packet; + if (readBuf.hasRemaining()) { + exBuffers.add(readBuf); + readBuf = context.pollBuffer(); + readBuffer = readBuf; + } + break; + } + packets.add(packet); + if (packet == null || !readBuf.hasRemaining()) break; + } + } catch (Exception e) { + context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e); + webSocket.onOccurException(e, null); + } + //继续监听消息 + readBuf.clear(); + if (halfBytes.getValue() != null) { + readBuf.put(halfBytes.getValue()); + halfBytes.setValue(null); + } + channel.read(readBuf, null, this); + + //消息处理 + for (final WebSocketPacket packet : packets) { + if (packet == null) { + if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); + failed(null, attachment1); + return; + } + + if (packet.type == FrameType.TEXT) { + try { + if (packet.receiveType == WebSocketPacket.MessageType.STRING) { + webSocket.onMessage((String) packet.receiveMessage, packet.last); + } else { + if (restMessageConsumer != null) { //主要供RestWebSocket使用 + restMessageConsumer.accept(webSocket, packet.receiveMessage); + } else { + webSocket.onMessage(packet.receiveMessage, packet.last); + } + } + } catch (Throwable e) { + context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); + } + } else if (packet.type == FrameType.BINARY) { + try { + if (packet.receiveType == WebSocketPacket.MessageType.BYTES) { + webSocket.onMessage((byte[]) packet.receiveMessage, packet.last); + } else { + if (restMessageConsumer != null) { //主要供RestWebSocket使用 + restMessageConsumer.accept(webSocket, packet.receiveMessage); + } else { + webSocket.onMessage(packet.receiveMessage, packet.last); + } + } + } catch (Throwable e) { + context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); + } + } else if (packet.type == FrameType.PING) { + try { + webSocket.onPing((byte[]) packet.receiveMessage); + } catch (Exception e) { + context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e); + } + } else if (packet.type == FrameType.PONG) { + try { + if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner onMessage by PONG FrameType : " + packet); + webSocket.onPong((byte[]) packet.receiveMessage); + } catch (Exception e) { + context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e); + } + } else if (packet.type == FrameType.CLOSE) { + if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet); + closeRunner(0, "received CLOSE frame-type message"); + return; + } else { + context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet); + closeRunner(0, "received unknown frame-type message"); + return; } - packets.add(packet); - if (packet == null || !readBuffer.hasRemaining()) break; } } catch (Exception e) { - context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e); - webSocket.onOccurException(e, null); - } - //继续监听消息 - readBuffer.clear(); - if (halfBytes.getValue() != null) { - readBuffer.put(halfBytes.getValue()); - halfBytes.setValue(null); - } - channel.read(readBuffer, null, this); - - //消息处理 - for (final WebSocketPacket packet : packets) { - if (packet == null) { - if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); - failed(null, attachment1); - return; - } - - if (packet.type == FrameType.TEXT) { - try { - if (packet.receiveType == WebSocketPacket.MessageType.STRING) { - webSocket.onMessage((String) packet.receiveMessage, packet.last); - } else { - if (restMessageConsumer != null) { //主要供RestWebSocket使用 - restMessageConsumer.accept(webSocket, packet.receiveMessage); - } else { - webSocket.onMessage(packet.receiveMessage, packet.last); - } - } - } catch (Throwable e) { - context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); - } - } else if (packet.type == FrameType.BINARY) { - try { - if (packet.receiveType == WebSocketPacket.MessageType.BYTES) { - webSocket.onMessage((byte[]) packet.receiveMessage, packet.last); - } else { - if (restMessageConsumer != null) { //主要供RestWebSocket使用 - restMessageConsumer.accept(webSocket, packet.receiveMessage); - } else { - webSocket.onMessage(packet.receiveMessage, packet.last); - } - } - } catch (Throwable e) { - context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); - } - } else if (packet.type == FrameType.PING) { - try { - webSocket.onPing((byte[]) packet.receiveMessage); - } catch (Exception e) { - context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e); - } - } else if (packet.type == FrameType.PONG) { - try { - if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner onMessage by PONG FrameType : " + packet); - webSocket.onPong((byte[]) packet.receiveMessage); - } catch (Exception e) { - context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e); - } - } else if (packet.type == FrameType.CLOSE) { - if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet); - closeRunner(0, "received CLOSE frame-type message"); - return; - } else { - context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet); - closeRunner(0, "received unknown frame-type message"); - return; - } + context.getLogger().log(Level.WARNING, "WebSocketRunner(userid=" + webSocket.getUserid() + ") onMessage by received error", e); + closeRunner(0, "websocket-received error"); } } @@ -222,7 +230,7 @@ class WebSocketRunner implements Runnable { try { synchronized (writing) { if (writing.getAndSet(true)) { - queue.add(new QueueEntry(futureResult, packet)); + writeQueue.add(new QueueEntry(futureResult, packet)); return futureResult; } } @@ -271,7 +279,7 @@ class WebSocketRunner implements Runnable { } QueueEntry entry = null; synchronized (writing) { - entry = queue.poll(); + entry = writeQueue.poll(); if (entry == null) writing.set(false); } if (entry != null) { @@ -282,8 +290,9 @@ class WebSocketRunner implements Runnable { channel.write(buffers, buffers, this); } } catch (Exception e) { - context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e); + futureResult.complete(RETCODE_SENDEXCEPTION); closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on rewrite"); + context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e); } } @@ -317,14 +326,16 @@ class WebSocketRunner implements Runnable { synchronized (this) { if (closed) return; closed = true; - try { - channel.close(); - } catch (Throwable t) { - } + channel.dispose(); context.offerBuffer(readBuffer); readBuffer = null; engine.remove(webSocket); webSocket.onClose(code, reason); + QueueEntry entry = writeQueue.poll(); + while (entry != null) { + entry.future.complete(RETCODE_WSOCKET_CLOSED); + entry = writeQueue.poll(); + } } }