This commit is contained in:
Redkale
2017-05-22 16:46:59 +08:00
parent a29cc94f32
commit 859e56af4d

View File

@@ -95,75 +95,87 @@ public class WebSocketRunner implements Runnable {
return; return;
} }
readBuffer.flip(); readBuffer.flip();
try {
ByteBuffer[] exBuffers = null; ByteBuffer[] exBuffers = null;
if (!readBuffers.isEmpty()) { if (!readBuffers.isEmpty()) {
exBuffers = readBuffers.toArray(new ByteBuffer[readBuffers.size()]); exBuffers = readBuffers.toArray(new ByteBuffer[readBuffers.size()]);
readBuffers.clear(); readBuffers.clear();
recentExBuffer = null; recentExBuffer = null;
for (ByteBuffer b : exBuffers) { for (ByteBuffer b : exBuffers) {
b.flip(); b.flip();
}
} }
try { }
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers); try {
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
if (packet == null) { if (packet == null) {
failed(null, attachment1); failed(null, attachment1);
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
return; return;
}
webSocket._group.setRecentWebSocket(webSocket);
if (packet.type == FrameType.TEXT) {
Object message = convert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
} }
webSocket._group.setRecentWebSocket(webSocket);
try { try {
if (packet.type == FrameType.TEXT) { webSocket.onMessage(message, packet.last);
Object message = convert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
}
webSocket.onMessage(message, packet.last);
} else if (packet.type == FrameType.BINARY) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
}
webSocket.onMessage(message, packet.last);
} else if (packet.type == FrameType.PONG) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
}
webSocket.onPong(message);
} else if (packet.type == FrameType.PING) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
}
webSocket.onPing(message);
} else {
context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
}
}
} catch (Exception e) { } catch (Exception e) {
context.getLogger().log(Level.INFO, "WebSocket onMessage error (" + packet + ")", e); context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
} }
} finally { } else if (packet.type == FrameType.BINARY) {
if (exBuffers != null) { byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
for (ByteBuffer b : exBuffers) { if (readBuffer != null) {
context.offerBuffer(b); readBuffer.clear();
} channel.read(readBuffer, null, this);
}
try {
webSocket.onMessage(message, packet.last);
} catch (Exception e) {
context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
}
} else if (packet.type == FrameType.PONG) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
}
try {
webSocket.onPong(message);
} catch (Exception e) {
context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
}
} else if (packet.type == FrameType.PING) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
}
try {
webSocket.onPing(message);
} catch (Exception e) {
context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e);
}
} else {
context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
closeRunner(); closeRunner();
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
} finally {
if (exBuffers != null) {
for (ByteBuffer b : exBuffers) {
context.offerBuffer(b);
}
}
} }
} }