fastWrite优化

This commit is contained in:
redkale
2024-10-07 22:22:25 +08:00
parent 77a8b8fa7f
commit 3d8c2f1fc5
3 changed files with 11 additions and 8 deletions

View File

@@ -273,12 +273,12 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
throw new RedkaleException("fast-writer handler is null"); throw new RedkaleException("fast-writer handler is null");
} }
for (Consumer<ByteArray> c : consumers) { for (Consumer<ByteArray> c : consumers) {
this.fastWriteQueue.add(c); this.fastWriteQueue.offer(c);
} }
this.ioWriteThread.fastWrite(this); this.ioWriteThread.fastWrite(this);
} }
protected abstract void fastPrepare(Object selector); protected abstract void fastPrepareInIOThread(Object selector);
// --------------------- fast-write-end --------------------- // --------------------- fast-write-end ---------------------
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) { protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {

View File

@@ -142,7 +142,7 @@ public class AsyncIOThread extends WorkThread {
} }
public final void fastWrite(AsyncConnection conn) { public final void fastWrite(AsyncConnection conn) {
fastQueue.add(Objects.requireNonNull(conn)); fastQueue.offer(Objects.requireNonNull(conn));
selector.wakeup(); selector.wakeup();
} }
@@ -163,7 +163,7 @@ public class AsyncIOThread extends WorkThread {
try { try {
AsyncConnection fastConn; AsyncConnection fastConn;
while ((fastConn = fastQueue.poll()) != null) { while ((fastConn = fastQueue.poll()) != null) {
fastConn.fastPrepare(selector); fastConn.fastPrepareInIOThread(selector);
} }
Consumer<Selector> register; Consumer<Selector> register;

View File

@@ -177,11 +177,11 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
@Override @Override
protected void fastPrepare(Object selector) { protected void fastPrepareInIOThread(Object selector) {
if (this.writePending) { ByteArray array = this.fastWriteArray;
return; if (!this.writePending) {
array.clear();
} }
ByteArray array = this.fastWriteArray.clear();
Consumer<ByteArray> func; Consumer<ByteArray> func;
while ((func = fastWriteQueue.poll()) != null) { while ((func = fastWriteQueue.poll()) != null) {
func.accept(array); func.accept(array);
@@ -373,6 +373,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeByteTuple2Offset = 0; this.writeByteTuple2Offset = 0;
this.writeByteTuple2Length = 0; this.writeByteTuple2Length = 0;
} }
if (this.fastWriteArray != null) {
this.fastWriteArray.clear();
}
} }
int writeCount; int writeCount;
if (writeByteBuffer != null) { if (writeByteBuffer != null) {