net浼樺寲璁℃暟鍣?

This commit is contained in:
Redkale
2023-01-27 15:59:09 +08:00
parent 8d4ebe653d
commit a492cbd815
5 changed files with 20 additions and 30 deletions

View File

@@ -369,13 +369,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
CompletionHandler<Integer, Object> newHandler = new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
offerWriteBuffer(buffers);
offerWriteBuffers(buffers);
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, Object attachment) {
offerWriteBuffer(buffers);
offerWriteBuffers(buffers);
handler.failed(exc, attachment);
}
};
@@ -476,13 +476,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
CompletionHandler<Integer, ? super A> newHandler = new CompletionHandler<Integer, A>() {
@Override
public void completed(Integer result, A attachment) {
offerWriteBuffer(srcs);
offerWriteBuffers(srcs);
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
offerWriteBuffer(srcs);
offerWriteBuffers(srcs);
handler.failed(exc, attachment);
}
};
@@ -689,7 +689,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
readBufferConsumer.accept(buffer);
}
public void offerReadBuffer(ByteBuffer... buffers) {
public void offerReadBuffers(ByteBuffer... buffers) {
if (buffers == null) {
return;
}
@@ -706,7 +706,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
writeBufferConsumer.accept(buffer);
}
public void offerWriteBuffer(ByteBuffer... buffers) {
public void offerWriteBuffers(ByteBuffer... buffers) {
if (buffers == null) {
return;
}
@@ -1009,20 +1009,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
writeImpl(netBuffers, 0, netBuffers.length, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer count, Void attachment) {
offerWriteBuffer(netBuffers);
offerWriteBuffers(netBuffers);
callback.accept(null);
}
@Override
public void failed(Throwable t, Void attachment) {
offerWriteBuffer(netBuffers);
offerWriteBuffers(netBuffers);
callback.accept(t);
}
});
}
return true;
} else {
offerWriteBuffer(netBuffers);
offerWriteBuffers(netBuffers);
return false;
}
}
@@ -1048,20 +1048,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
writeImpl(netBuffers, 0, netBuffers.length, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer count, Void attachment) {
offerWriteBuffer(netBuffers);
offerWriteBuffers(netBuffers);
callback.accept(null);
}
@Override
public void failed(Throwable t, Void attachment) {
offerWriteBuffer(netBuffers);
offerWriteBuffers(netBuffers);
callback.accept(t);
}
});
}
return true;
} else {
offerWriteBuffer(netBuffers);
offerWriteBuffers(netBuffers);
return false;
}
}

View File

@@ -10,7 +10,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.*;
import java.util.logging.*;
import org.redkale.util.*;
@@ -31,9 +30,6 @@ public class AsyncIOThread extends WorkThread {
final Selector selector;
//如果有read/write两IOThread只记readThread
final LongAdder connCounter = new LongAdder();
private final Supplier<ByteBuffer> bufferSupplier;
private final Consumer<ByteBuffer> bufferConsumer;
@@ -141,10 +137,6 @@ public class AsyncIOThread extends WorkThread {
return bufferConsumer;
}
public int currConnections() {
return connCounter.intValue();
}
@Override
public void run() {
final Queue<Runnable> commands = this.commandQueue;

View File

@@ -74,9 +74,9 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
if (conn != null) {
if (buffers != null) {
if (readMode) {
conn.offerReadBuffer(buffers);
conn.offerReadBuffers(buffers);
} else {
conn.offerWriteBuffer(buffers);
conn.offerWriteBuffers(buffers);
}
} else if (buffer != null) {
if (readMode) {
@@ -102,9 +102,9 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
if (conn != null) {
if (buffers != null) {
if (readMode) {
conn.offerReadBuffer(buffers);
conn.offerReadBuffers(buffers);
} else {
conn.offerWriteBuffer(buffers);
conn.offerWriteBuffers(buffers);
}
} else if (buffer != null) {
if (readMode) {
@@ -125,9 +125,9 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
if (conn != null) {
if (buffers != null) {
if (readMode) {
conn.offerReadBuffer(buffers);
conn.offerReadBuffers(buffers);
} else {
conn.offerWriteBuffer(buffers);
conn.offerWriteBuffers(buffers);
}
} else if (buffer != null) {
if (readMode) {

View File

@@ -39,7 +39,6 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
}
}
this.remoteAddress = addr;
ioReadThread.connCounter.increment();
}
@Override
@@ -183,7 +182,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
channel.write(netBuffers);
}
}
offerWriteBuffer(netBuffers);
offerWriteBuffers(netBuffers);
return len;
}
@@ -269,7 +268,6 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
@Override
public final void close() throws IOException {
super.close();
ioReadThread.connCounter.decrement();
channel.close();
if (this.connectKey != null) {
this.connectKey.cancel();

View File

@@ -437,7 +437,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void completed(Integer result, A attachment) {
channel.offerWriteBuffer(buffers);
channel.offerWriteBuffers(buffers);
if (handler != null) {
handler.completed(result, attachment);
}