This commit is contained in:
地平线
2015-06-29 11:28:48 +08:00
parent fee9813b90
commit e276096bf3

View File

@@ -64,23 +64,37 @@ public class WebSocketRunner implements Runnable {
if (channel.isOpen()) { if (channel.isOpen()) {
channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() { channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
private ByteBuffer recentExBuffer;
//当接收的数据流长度大于ByteBuffer长度时 则需要额外的ByteBuffer 辅助; //当接收的数据流长度大于ByteBuffer长度时 则需要额外的ByteBuffer 辅助;
private final List<ByteBuffer> readBuffers = new ArrayList<>(); private final List<ByteBuffer> readBuffers = new ArrayList<>();
@Override @Override
public void completed(Integer count, Void attachment1) { public void completed(Integer count, Void attachment1) {
if (count < 1) { if (count < 1 && readBuffers.isEmpty()) {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel"); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel");
closeRunner(); closeRunner();
return; return;
} }
if (readBuffer == null) return; if (readBuffer == null) return;
readBuffer.flip(); if (recentExBuffer == null) {
readBuffer.flip();
} else {
recentExBuffer.flip();
}
if (!readBuffer.hasRemaining() && (recentExBuffer == null || !recentExBuffer.hasRemaining())) {
final ByteBuffer buffer = context.pollBuffer();
recentExBuffer = buffer;
readBuffers.add(buffer);
channel.read(buffer, null, this);
return;
}
try { try {
ByteBuffer[] exBuffers = null; ByteBuffer[] exBuffers = null;
if (!readBuffers.isEmpty()) { if (!readBuffers.isEmpty()) {
exBuffers = readBuffers.toArray(new ByteBuffer[readBuffers.size()]); exBuffers = readBuffers.toArray(new ByteBuffer[readBuffers.size()]);
readBuffers.clear(); readBuffers.clear();
recentExBuffer = null;
} }
WebSocketPacket packet = coder.decode(readBuffer, exBuffers); WebSocketPacket packet = coder.decode(readBuffer, exBuffers);
if (exBuffers != null) { if (exBuffers != null) {
@@ -383,7 +397,7 @@ public class WebSocketRunner implements Runnable {
masker.readMask(); masker.readMask();
} }
if (masker.remaining() < length) { if (masker.remaining() < length) {
if (debug) logger.log(Level.FINE, " read illegal remaining length from websocket, expect " + length + " but " + buffer.remaining()); if (debug) logger.log(Level.FINE, " read illegal remaining length from websocket, expect " + length + " but " + masker.remaining());
return null; return null;
} }
final byte[] data = masker.unmask(length); final byte[] data = masker.unmask(length);