This commit is contained in:
Redkale
2020-01-27 18:39:11 +08:00
parent c50eb79b1d
commit 469dff8478

View File

@@ -39,7 +39,7 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
protected final Consumer<ByteBuffer> bufferConsumer; protected final Consumer<ByteBuffer> bufferConsumer;
protected ByteBuffer readBuffer; protected ByteBuffer attrBuffer;
//在线数 //在线数
protected AtomicLong livingCounter; protected AtomicLong livingCounter;
@@ -129,15 +129,15 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
public abstract <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler); public abstract <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
public void setReadBuffer(Buffer buffer) { public void setReadBuffer(ByteBuffer buffer) {
if (this.readBuffer != null) throw new RuntimeException("repeat AsyncConnection.setReadBuffer"); if (this.attrBuffer != null) throw new RuntimeException("repeat AsyncConnection.setReadBuffer");
this.readBuffer = (ByteBuffer) buffer; this.attrBuffer = buffer;
} }
public ByteBuffer pollReadBuffer() { public ByteBuffer pollReadBuffer() {
ByteBuffer rs = this.readBuffer; ByteBuffer rs = this.attrBuffer;
if (rs != null) { if (rs != null) {
this.readBuffer = null; this.attrBuffer = null;
return rs; return rs;
} }
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
@@ -147,29 +147,48 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
return bufferSupplier.get(); return bufferSupplier.get();
} }
public void offerBuffer(Buffer buffer) { public void offerBuffer(ByteBuffer buffer) {
if (buffer == null) return; if (buffer == null) return;
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) { // if (thread instanceof IOThread) {
// ((IOThread) thread).getBufferPool().accept((ByteBuffer) buffer); // ((IOThread) thread).getBufferPool().accept((ByteBuffer) buffer);
// return; // return;
// } // }
bufferConsumer.accept((ByteBuffer) buffer);
if (this.attrBuffer == null) {
buffer.clear();
this.attrBuffer = buffer;
} else {
bufferConsumer.accept(buffer);
}
} }
public void offerBuffer(Buffer... buffers) { public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null) return; if (buffers == null || buffers.length < 1) return;
Consumer<ByteBuffer> consumer = this.bufferConsumer; Consumer<ByteBuffer> consumer = this.bufferConsumer;
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) { // if (thread instanceof IOThread) {
// consumer = ((IOThread) thread).getBufferPool(); // consumer = ((IOThread) thread).getBufferPool();
// } // }
for (Buffer buffer : buffers) { if (this.attrBuffer == null) {
consumer.accept((ByteBuffer) buffer); buffers[0].clear();
this.attrBuffer = buffers[0];
for (int i = 1; i < buffers.length; i++) {
consumer.accept(buffers[i]);
}
} else {
for (ByteBuffer buffer : buffers) {
consumer.accept(buffer);
}
} }
} }
public ByteBuffer pollWriteBuffer() { public ByteBuffer pollWriteBuffer() {
ByteBuffer rs = this.attrBuffer;
if (rs != null) {
this.attrBuffer = null;
return rs;
}
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) { // if (thread instanceof IOThread) {
// return ((IOThread) thread).getBufferPool().get(); // return ((IOThread) thread).getBufferPool().get();
@@ -205,13 +224,13 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
} catch (Exception io) { } catch (Exception io) {
} }
} }
if (this.readBuffer != null) { if (this.attrBuffer != null) {
Consumer<ByteBuffer> consumer = this.bufferConsumer; Consumer<ByteBuffer> consumer = this.bufferConsumer;
// Thread thread = Thread.currentThread(); // Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) { // if (thread instanceof IOThread) {
// consumer = ((IOThread) thread).getBufferPool(); // consumer = ((IOThread) thread).getBufferPool();
// } // }
consumer.accept(this.readBuffer); consumer.accept(this.attrBuffer);
} }
if (attributes == null) return; if (attributes == null) return;
try { try {