This commit is contained in:
Redkale
2018-01-03 16:11:11 +08:00
parent 808a3c2e9c
commit 7b0a576b00

View File

@@ -59,7 +59,7 @@ class WebSocketRunner implements Runnable {
@Override
public void run() {
final boolean debug = true;
final boolean debug = context.getLogger().isLoggable(Level.FINEST);
try {
webSocket.onConnected();
channel.setReadTimeoutSecond(300); //读取超时5分钟
@@ -78,8 +78,8 @@ class WebSocketRunner implements Runnable {
@Override
public void completed(Integer count, Void attachment1) {
if (count < 1) {
closeRunner(0);
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
closeRunner(0);
return;
}
if (readBuffer == null) return;
@@ -133,8 +133,8 @@ class WebSocketRunner implements Runnable {
//消息处理
for (final WebSocketPacket packet : packets) {
if (packet == null) {
failed(null, attachment1);
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
failed(null, attachment1);
return;
}
if (packet.type == FrameType.TEXT) {
@@ -179,7 +179,7 @@ class WebSocketRunner implements Runnable {
}
} else if (packet.type == FrameType.CLOSE) {
Logger logger = context.getLogger();
if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
if (debug) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
closeRunner(0);
return;
} else {
@@ -188,163 +188,23 @@ class WebSocketRunner implements Runnable {
return;
}
}
// if (true) return; //以下代码废弃
// try {
// WebSocketPacket packet;
// try {
// packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
// } catch (Exception e) { //接收的消息体解析失败
// webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers));
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// return;
// }
// if (packet == null) {
// failed(null, attachment1);
// if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
// return;
// }
//
// if (packet.type == FrameType.TEXT) {
// Convert textConvert = webSocket.getTextConvert();
// if (textConvert == null) {
// byte[] message = packet.getReceiveBytes();
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// try {
// webSocket.onMessage(new String(message, "UTF-8"), packet.last);
// } catch (Exception e) {
// context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
// }
// } else {
// Object message;
// try {
// message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
// } catch (Exception e) { //接收的消息体解析失败
// webSocket.onOccurException(e, packet.receiveBuffers);
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// return;
// }
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// try {
// if (restMessageConsumer != null) { //主要供RestWebSocket使用
// restMessageConsumer.accept(webSocket, message);
// } else {
// webSocket.onMessage(message, packet.last);
// }
// } catch (Exception e) {
// context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
// }
// }
// } else if (packet.type == FrameType.BINARY) {
// Convert binaryConvert = webSocket.getBinaryConvert();
// if (binaryConvert == null) {
// byte[] message = packet.getReceiveBytes();
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// try {
// webSocket.onMessage(message, packet.last);
// } catch (Exception e) {
// context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
// }
// } else {
// Object message;
// try {
// message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
// } catch (Exception e) { //接收的消息体解析失败
// webSocket.onOccurException(e, packet.receiveBuffers);
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// return;
// }
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// try {
// if (restMessageConsumer != null) { //主要供RestWebSocket使用
// restMessageConsumer.accept(webSocket, message);
// } else {
// webSocket.onMessage(message, packet.last);
// }
// } catch (Exception e) {
// context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
// }
// }
// } else if (packet.type == FrameType.PONG) {
// byte[] message = packet.getReceiveBytes();
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// try {
// webSocket.onPong(message);
// } catch (Exception e) {
// context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
// }
// } else if (packet.type == FrameType.PING) {
// byte[] message = packet.getReceiveBytes();
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// try {
// webSocket.onPing(message);
// } catch (Exception e) {
// context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e);
// }
// } else if (packet.type == FrameType.CLOSE) {
// Logger logger = context.getLogger();
// if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
// closeRunner(0);
// } else {
// context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
// if (readBuffer != null) {
// readBuffer.clear();
// channel.read(readBuffer, null, this);
// }
// }
// } catch (Throwable t) {
// closeRunner(0);
// if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
// } finally {
// if (exBuffers != null) {
// for (ByteBuffer b : exBuffers) {
// context.offerBuffer(b);
// }
// }
// }
}
@Override
public void failed(Throwable exc, Void attachment2) {
closeRunner(0);
if (exc != null) {
context.getLogger().log(Level.FINEST, "WebSocketRunner read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
}
closeRunner(0);
}
});
} else {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort by AsyncConnection closed");
closeRunner(0);
context.getLogger().log(Level.FINEST, "WebSocketRunner abort by AsyncConnection closed");
}
} catch (Exception e) {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read bytes from channel, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
closeRunner(0);
context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read bytes from channel, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
}
}
@@ -409,8 +269,8 @@ class WebSocketRunner implements Runnable {
channel.write(buffers, buffers, this);
}
} catch (Exception e) {
closeRunner(0);
context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
closeRunner(0);
}
writing.set(false);
}
@@ -418,16 +278,17 @@ class WebSocketRunner implements Runnable {
@Override
public void failed(Throwable exc, ByteBuffer[] attachments) {
writing.set(false);
closeRunner(0);
if (exc != null) {
context.getLogger().log(Level.FINE, "WebSocket sendMessage on CompletionHandler failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
}
closeRunner(0);
}
});
} catch (Exception t) {
writing.set(false);
closeRunner(0);
context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
closeRunner(0);
futureResult.complete(RETCODE_SENDEXCEPTION);
}
return futureResult;