This commit is contained in:
Redkale
2020-01-29 11:35:49 +08:00
parent 6f00efa077
commit a34f85bfc9

View File

@@ -39,7 +39,7 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
protected final Consumer<ByteBuffer> bufferConsumer; protected final Consumer<ByteBuffer> bufferConsumer;
protected ByteBuffer attrBuffer; protected ByteBuffer readBuffer;
//在线数 //在线数
protected AtomicLong livingCounter; protected AtomicLong livingCounter;
@@ -130,14 +130,14 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
public abstract <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler); public abstract <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
public void setReadBuffer(ByteBuffer buffer) { public void setReadBuffer(ByteBuffer buffer) {
if (this.attrBuffer != null) throw new RuntimeException("repeat AsyncConnection.setReadBuffer"); if (this.readBuffer != null) throw new RuntimeException("repeat AsyncConnection.setReadBuffer");
this.attrBuffer = buffer; this.readBuffer = buffer;
} }
public ByteBuffer pollReadBuffer() { public ByteBuffer pollReadBuffer() {
ByteBuffer rs = this.attrBuffer; ByteBuffer rs = this.readBuffer;
if (rs != null) { if (rs != null) {
this.attrBuffer = null; this.readBuffer = null;
return rs; return rs;
} }
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
@@ -154,41 +154,22 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
// ((IOThread) thread).getBufferPool().accept((ByteBuffer) buffer); // ((IOThread) thread).getBufferPool().accept((ByteBuffer) buffer);
// return; // return;
// } // }
bufferConsumer.accept(buffer);
if (this.attrBuffer == null) {
buffer.clear();
this.attrBuffer = buffer;
} else {
bufferConsumer.accept(buffer);
}
} }
public void offerBuffer(ByteBuffer... buffers) { public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null || buffers.length < 1) return; if (buffers == null) return;
Consumer<ByteBuffer> consumer = this.bufferConsumer; Consumer<ByteBuffer> consumer = this.bufferConsumer;
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) { // if (thread instanceof IOThread) {
// consumer = ((IOThread) thread).getBufferPool(); // consumer = ((IOThread) thread).getBufferPool();
// } // }
if (this.attrBuffer == null) { for (ByteBuffer buffer : buffers) {
buffers[0].clear(); consumer.accept(buffer);
this.attrBuffer = buffers[0];
for (int i = 1; i < buffers.length; i++) {
consumer.accept(buffers[i]);
}
} else {
for (ByteBuffer buffer : buffers) {
consumer.accept(buffer);
}
} }
} }
public ByteBuffer pollWriteBuffer() { public ByteBuffer pollWriteBuffer() {
ByteBuffer rs = this.attrBuffer;
if (rs != null) {
this.attrBuffer = null;
return rs;
}
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) { // if (thread instanceof IOThread) {
// return ((IOThread) thread).getBufferPool().get(); // return ((IOThread) thread).getBufferPool().get();
@@ -224,13 +205,13 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
} catch (Exception io) { } catch (Exception io) {
} }
} }
if (this.attrBuffer != null) { if (this.readBuffer != null) {
Consumer<ByteBuffer> consumer = this.bufferConsumer; Consumer<ByteBuffer> consumer = this.bufferConsumer;
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) { // if (thread instanceof IOThread) {
// consumer = ((IOThread) thread).getBufferPool(); // consumer = ((IOThread) thread).getBufferPool();
// } // }
consumer.accept(this.attrBuffer); consumer.accept(this.readBuffer);
} }
if (attributes == null) return; if (attributes == null) return;
try { try {