diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index efa35352e..ed050ece0 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -39,7 +39,7 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy protected final Consumer bufferConsumer; - protected ByteBuffer readBuffer; + protected ByteBuffer attrBuffer; //在线数 protected AtomicLong livingCounter; @@ -129,15 +129,15 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy public abstract void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); - public void setReadBuffer(Buffer buffer) { - if (this.readBuffer != null) throw new RuntimeException("repeat AsyncConnection.setReadBuffer"); - this.readBuffer = (ByteBuffer) buffer; + public void setReadBuffer(ByteBuffer buffer) { + if (this.attrBuffer != null) throw new RuntimeException("repeat AsyncConnection.setReadBuffer"); + this.attrBuffer = buffer; } public ByteBuffer pollReadBuffer() { - ByteBuffer rs = this.readBuffer; + ByteBuffer rs = this.attrBuffer; if (rs != null) { - this.readBuffer = null; + this.attrBuffer = null; return rs; } // Thread thread = Thread.currentThread(); @@ -147,29 +147,48 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy return bufferSupplier.get(); } - public void offerBuffer(Buffer buffer) { + public void offerBuffer(ByteBuffer buffer) { if (buffer == null) return; // Thread thread = Thread.currentThread(); // if (thread instanceof IOThread) { // ((IOThread) thread).getBufferPool().accept((ByteBuffer) buffer); // return; // } - bufferConsumer.accept((ByteBuffer) buffer); + + if (this.attrBuffer == null) { + buffer.clear(); + this.attrBuffer = buffer; + } else { + bufferConsumer.accept(buffer); + } } - public void offerBuffer(Buffer... buffers) { - if (buffers == null) return; + public void offerBuffer(ByteBuffer... buffers) { + if (buffers == null || buffers.length < 1) return; Consumer consumer = this.bufferConsumer; // Thread thread = Thread.currentThread(); // if (thread instanceof IOThread) { // consumer = ((IOThread) thread).getBufferPool(); // } - for (Buffer buffer : buffers) { - consumer.accept((ByteBuffer) buffer); + if (this.attrBuffer == null) { + buffers[0].clear(); + this.attrBuffer = buffers[0]; + for (int i = 1; i < buffers.length; i++) { + consumer.accept(buffers[i]); + } + } else { + for (ByteBuffer buffer : buffers) { + consumer.accept(buffer); + } } } public ByteBuffer pollWriteBuffer() { + ByteBuffer rs = this.attrBuffer; + if (rs != null) { + this.attrBuffer = null; + return rs; + } // Thread thread = Thread.currentThread(); // if (thread instanceof IOThread) { // return ((IOThread) thread).getBufferPool().get(); @@ -205,13 +224,13 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy } catch (Exception io) { } } - if (this.readBuffer != null) { + if (this.attrBuffer != null) { Consumer consumer = this.bufferConsumer; // Thread thread = Thread.currentThread(); // if (thread instanceof IOThread) { // consumer = ((IOThread) thread).getBufferPool(); // } - consumer.accept(this.readBuffer); + consumer.accept(this.attrBuffer); } if (attributes == null) return; try {