优化AsyncConnection的Thread
This commit is contained in:
@@ -52,9 +52,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
|
|||||||
|
|
||||||
protected final int bufferCapacity;
|
protected final int bufferCapacity;
|
||||||
|
|
||||||
private final Supplier<ByteBuffer> bufferSupplier;
|
private final Supplier<ByteBuffer> readBufferSupplier;
|
||||||
|
|
||||||
private final Consumer<ByteBuffer> bufferConsumer;
|
private final Consumer<ByteBuffer> readBufferConsumer;
|
||||||
|
|
||||||
|
private final Supplier<ByteBuffer> writeBufferSupplier;
|
||||||
|
|
||||||
|
private final Consumer<ByteBuffer> writeBufferConsumer;
|
||||||
|
|
||||||
private ByteBufferWriter pipelineWriter;
|
private ByteBufferWriter pipelineWriter;
|
||||||
|
|
||||||
@@ -78,24 +82,34 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
|
|||||||
//用于服务端的Socket, 等同于一直存在的readCompletionHandler
|
//用于服务端的Socket, 等同于一直存在的readCompletionHandler
|
||||||
ProtocolCodec protocolCodec;
|
ProtocolCodec protocolCodec;
|
||||||
|
|
||||||
protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, ObjectPool<ByteBuffer> bufferPool,
|
protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread,
|
||||||
SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) {
|
final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) {
|
||||||
this(client, ioGroup, ioThread, bufferCapacity, bufferPool, bufferPool, sslBuilder, sslContext, livingCounter, closedCounter);
|
this(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity,
|
||||||
|
ioReadThread.getBufferSupplier(), ioReadThread.getBufferConsumer(),
|
||||||
|
ioWriteThread.getBufferSupplier(), ioWriteThread.getBufferConsumer(),
|
||||||
|
sslBuilder, sslContext, livingCounter, closedCounter);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, Supplier<ByteBuffer> bufferSupplier,
|
protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity,
|
||||||
Consumer<ByteBuffer> bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) {
|
Supplier<ByteBuffer> readBufferSupplier, Consumer<ByteBuffer> readBufferConsumer,
|
||||||
|
Supplier<ByteBuffer> writeBufferSupplier, Consumer<ByteBuffer> writeBufferConsumer,
|
||||||
|
SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) {
|
||||||
Objects.requireNonNull(ioGroup);
|
Objects.requireNonNull(ioGroup);
|
||||||
Objects.requireNonNull(ioThread);
|
Objects.requireNonNull(ioReadThread);
|
||||||
Objects.requireNonNull(bufferSupplier);
|
Objects.requireNonNull(ioWriteThread);
|
||||||
Objects.requireNonNull(bufferConsumer);
|
Objects.requireNonNull(readBufferSupplier);
|
||||||
|
Objects.requireNonNull(readBufferConsumer);
|
||||||
|
Objects.requireNonNull(writeBufferSupplier);
|
||||||
|
Objects.requireNonNull(writeBufferConsumer);
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.ioGroup = ioGroup;
|
this.ioGroup = ioGroup;
|
||||||
this.ioReadThread = ioThread;
|
this.ioReadThread = ioReadThread;
|
||||||
this.ioWriteThread = ioThread;
|
this.ioWriteThread = ioWriteThread;
|
||||||
this.bufferCapacity = bufferCapacity;
|
this.bufferCapacity = bufferCapacity;
|
||||||
this.bufferSupplier = bufferSupplier;
|
this.readBufferSupplier = readBufferSupplier;
|
||||||
this.bufferConsumer = bufferConsumer;
|
this.readBufferConsumer = readBufferConsumer;
|
||||||
|
this.writeBufferSupplier = writeBufferSupplier;
|
||||||
|
this.writeBufferConsumer = writeBufferConsumer;
|
||||||
this.livingCounter = livingCounter;
|
this.livingCounter = livingCounter;
|
||||||
this.closedCounter = closedCounter;
|
this.closedCounter = closedCounter;
|
||||||
if (client) { //client模式下无SSLBuilder
|
if (client) { //client模式下无SSLBuilder
|
||||||
@@ -114,19 +128,19 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Supplier<ByteBuffer> getReadBufferSupplier() {
|
public Supplier<ByteBuffer> getReadBufferSupplier() {
|
||||||
return this.bufferSupplier;
|
return this.readBufferSupplier;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Consumer<ByteBuffer> getReadBufferConsumer() {
|
public Consumer<ByteBuffer> getReadBufferConsumer() {
|
||||||
return this.bufferConsumer;
|
return this.readBufferConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Supplier<ByteBuffer> getWriteBufferSupplier() {
|
public Supplier<ByteBuffer> getWriteBufferSupplier() {
|
||||||
return this.bufferSupplier;
|
return this.writeBufferSupplier;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Consumer<ByteBuffer> getWriteBufferConsumer() {
|
public Consumer<ByteBuffer> getWriteBufferConsumer() {
|
||||||
return this.bufferConsumer;
|
return this.writeBufferConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public final long getLastReadTime() {
|
public final long getLastReadTime() {
|
||||||
@@ -337,7 +351,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
|
|||||||
};
|
};
|
||||||
write(buffer, null, newhandler);
|
write(buffer, null, newhandler);
|
||||||
} else {
|
} else {
|
||||||
ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? bufferSupplier : () -> pollWriteSSLBuffer(), buffer);
|
ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? writeBufferSupplier : () -> pollWriteSSLBuffer(), buffer);
|
||||||
writer.put(headerContent, headerOffset, headerLength);
|
writer.put(headerContent, headerOffset, headerLength);
|
||||||
if (bodyLength > 0) {
|
if (bodyLength > 0) {
|
||||||
writer.put(bodyContent, bodyOffset, bodyLength);
|
writer.put(bodyContent, bodyOffset, bodyLength);
|
||||||
@@ -583,7 +597,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
|
|||||||
this.readSSLHalfBuffer = null;
|
this.readSSLHalfBuffer = null;
|
||||||
return rs;
|
return rs;
|
||||||
}
|
}
|
||||||
return bufferSupplier.get();
|
return readBufferSupplier.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer pollReadBuffer() {
|
public ByteBuffer pollReadBuffer() {
|
||||||
@@ -592,21 +606,21 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
|
|||||||
this.readBuffer = null;
|
this.readBuffer = null;
|
||||||
return rs;
|
return rs;
|
||||||
}
|
}
|
||||||
return bufferSupplier.get();
|
return readBufferSupplier.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void offerReadBuffer(ByteBuffer buffer) {
|
public void offerReadBuffer(ByteBuffer buffer) {
|
||||||
if (buffer == null) {
|
if (buffer == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
bufferConsumer.accept(buffer);
|
readBufferConsumer.accept(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void offerReadBuffer(ByteBuffer... buffers) {
|
public void offerReadBuffer(ByteBuffer... buffers) {
|
||||||
if (buffers == null) {
|
if (buffers == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Consumer<ByteBuffer> consumer = this.bufferConsumer;
|
Consumer<ByteBuffer> consumer = this.readBufferConsumer;
|
||||||
for (ByteBuffer buffer : buffers) {
|
for (ByteBuffer buffer : buffers) {
|
||||||
consumer.accept(buffer);
|
consumer.accept(buffer);
|
||||||
}
|
}
|
||||||
@@ -616,25 +630,25 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
|
|||||||
if (buffer == null) {
|
if (buffer == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
bufferConsumer.accept(buffer);
|
writeBufferConsumer.accept(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void offerWriteBuffer(ByteBuffer... buffers) {
|
public void offerWriteBuffer(ByteBuffer... buffers) {
|
||||||
if (buffers == null) {
|
if (buffers == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Consumer<ByteBuffer> consumer = this.bufferConsumer;
|
Consumer<ByteBuffer> consumer = this.writeBufferConsumer;
|
||||||
for (ByteBuffer buffer : buffers) {
|
for (ByteBuffer buffer : buffers) {
|
||||||
consumer.accept(buffer);
|
consumer.accept(buffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer pollWriteSSLBuffer() {
|
public ByteBuffer pollWriteSSLBuffer() {
|
||||||
return bufferSupplier.get();
|
return writeBufferSupplier.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer pollWriteBuffer() {
|
public ByteBuffer pollWriteBuffer() {
|
||||||
return bufferSupplier.get();
|
return writeBufferSupplier.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void dispose() {//同close, 只是去掉throws IOException
|
public void dispose() {//同close, 只是去掉throws IOException
|
||||||
@@ -674,7 +688,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (this.readBuffer != null) {
|
if (this.readBuffer != null) {
|
||||||
Consumer<ByteBuffer> consumer = this.bufferConsumer;
|
Consumer<ByteBuffer> consumer = this.readBufferConsumer;
|
||||||
if (consumer != null) {
|
if (consumer != null) {
|
||||||
consumer.accept(this.readBuffer);
|
consumer.accept(this.readBuffer);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import java.io.IOException;
|
|||||||
import java.net.*;
|
import java.net.*;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.*;
|
import java.nio.channels.*;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.*;
|
import java.util.concurrent.atomic.*;
|
||||||
import org.redkale.annotation.ResourceType;
|
import org.redkale.annotation.ResourceType;
|
||||||
@@ -33,7 +34,11 @@ public class AsyncIOGroup extends AsyncGroup {
|
|||||||
|
|
||||||
private boolean skipClose;
|
private boolean skipClose;
|
||||||
|
|
||||||
AsyncIOThread[] ioThreads;
|
//必须与ioWriteThreads数量相同
|
||||||
|
private AsyncIOThread[] ioReadThreads;
|
||||||
|
|
||||||
|
//必须与ioReadThreads数量相同
|
||||||
|
private AsyncIOThread[] ioWriteThreads;
|
||||||
|
|
||||||
private AsyncIOThread connectThread;
|
private AsyncIOThread connectThread;
|
||||||
|
|
||||||
@@ -72,14 +77,23 @@ public class AsyncIOGroup extends AsyncGroup {
|
|||||||
public AsyncIOGroup(boolean client, String threadPrefixName0, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
|
public AsyncIOGroup(boolean client, String threadPrefixName0, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
|
||||||
this.bufferCapacity = bufferCapacity;
|
this.bufferCapacity = bufferCapacity;
|
||||||
final String threadPrefixName = threadPrefixName0 == null ? "Redkale-Client-IOThread" : threadPrefixName0;
|
final String threadPrefixName = threadPrefixName0 == null ? "Redkale-Client-IOThread" : threadPrefixName0;
|
||||||
this.ioThreads = new AsyncIOThread[Utility.cpus()];
|
final int threads = Utility.cpus();
|
||||||
|
this.ioReadThreads = new AsyncIOThread[threads];
|
||||||
|
this.ioWriteThreads = new AsyncIOThread[threads];
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < this.ioThreads.length; i++) {
|
for (int i = 0; i < threads; i++) {
|
||||||
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
|
ObjectPool<ByteBuffer> unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
|
||||||
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
|
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
|
||||||
String name = threadPrefixName + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1)));
|
String name = threadPrefixName + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1)));
|
||||||
this.ioThreads[i] = client ? new ClientIOThread(name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool)
|
this.ioReadThreads[i] = client ? new ClientIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool)
|
||||||
: new AsyncIOThread(name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool);
|
: new AsyncIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
|
||||||
|
if (client) {
|
||||||
|
this.ioReadThreads[i] = new ClientIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
|
||||||
|
this.ioWriteThreads[i] = this.ioReadThreads[i];
|
||||||
|
} else {
|
||||||
|
this.ioReadThreads[i] = new AsyncIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
|
||||||
|
this.ioWriteThreads[i] = this.ioReadThreads[i];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (client) {
|
if (client) {
|
||||||
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
|
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
|
||||||
@@ -99,10 +113,6 @@ public class AsyncIOGroup extends AsyncGroup {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public int size() {
|
|
||||||
return this.ioThreads.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AsyncGroup start() {
|
public AsyncGroup start() {
|
||||||
if (started) {
|
if (started) {
|
||||||
@@ -111,8 +121,11 @@ public class AsyncIOGroup extends AsyncGroup {
|
|||||||
if (closed) {
|
if (closed) {
|
||||||
throw new RuntimeException("group is closed");
|
throw new RuntimeException("group is closed");
|
||||||
}
|
}
|
||||||
for (AsyncIOThread thread : ioThreads) {
|
for (int i = 0; i < this.ioReadThreads.length; i++) {
|
||||||
thread.start();
|
this.ioReadThreads[i].start();
|
||||||
|
if (this.ioWriteThreads[i] != this.ioReadThreads[i]) {
|
||||||
|
this.ioWriteThreads[i].start();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (connectThread != null) {
|
if (connectThread != null) {
|
||||||
connectThread.start();
|
connectThread.start();
|
||||||
@@ -142,8 +155,11 @@ public class AsyncIOGroup extends AsyncGroup {
|
|||||||
if (closed) {
|
if (closed) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
for (AsyncIOThread thread : ioThreads) {
|
for (int i = 0; i < this.ioReadThreads.length; i++) {
|
||||||
thread.close();
|
this.ioReadThreads[i].close();
|
||||||
|
if (this.ioWriteThreads[i] != this.ioReadThreads[i]) {
|
||||||
|
this.ioWriteThreads[i].close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (connectThread != null) {
|
if (connectThread != null) {
|
||||||
connectThread.close();
|
connectThread.close();
|
||||||
@@ -165,8 +181,9 @@ public class AsyncIOGroup extends AsyncGroup {
|
|||||||
return connClosedCounter;
|
return connClosedCounter;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AsyncIOThread nextIOThread() {
|
public AsyncIOThread[] nextIOThreads() {
|
||||||
return ioThreads[Math.abs(readIndex.getAndIncrement()) % ioThreads.length];
|
int i = Math.abs(readIndex.getAndIncrement()) % ioReadThreads.length;
|
||||||
|
return new AsyncIOThread[]{ioReadThreads[i], ioWriteThreads[i]};
|
||||||
}
|
}
|
||||||
|
|
||||||
public AsyncIOThread connectThread() {
|
public AsyncIOThread connectThread() {
|
||||||
@@ -202,32 +219,47 @@ public class AsyncIOGroup extends AsyncGroup {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
//创建一个AsyncConnection对象,只给测试代码使用
|
||||||
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
public AsyncConnection newTCPClientConnection() {
|
||||||
SocketChannel channel;
|
|
||||||
try {
|
try {
|
||||||
channel = SocketChannel.open();
|
return newTCPClientConnection(null);
|
||||||
channel.configureBlocking(false);
|
|
||||||
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
|
||||||
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
|
||||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
return CompletableFuture.failedFuture(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
AsyncIOThread ioThread = null;
|
}
|
||||||
|
|
||||||
|
private AsyncNioTcpConnection newTCPClientConnection(final SocketAddress address) throws IOException {
|
||||||
|
SocketChannel channel = SocketChannel.open();
|
||||||
|
channel.configureBlocking(false);
|
||||||
|
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||||
|
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||||
|
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||||
|
|
||||||
|
AsyncIOThread[] ioThreads = null;
|
||||||
Thread currThread = Thread.currentThread();
|
Thread currThread = Thread.currentThread();
|
||||||
if (currThread instanceof AsyncIOThread) {
|
if (currThread instanceof AsyncIOThread) {
|
||||||
for (AsyncIOThread thread : ioThreads) {
|
for (int i = 0; i < this.ioReadThreads.length; i++) {
|
||||||
if (thread == currThread) {
|
if (this.ioReadThreads[i] == currThread || this.ioWriteThreads[i] == currThread) {
|
||||||
ioThread = thread;
|
ioThreads = new AsyncIOThread[]{this.ioReadThreads[i], this.ioWriteThreads[i]};
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ioThread == null) {
|
if (ioThreads == null) {
|
||||||
ioThread = nextIOThread();
|
ioThreads = nextIOThreads();
|
||||||
|
}
|
||||||
|
return new AsyncNioTcpConnection(true, this, ioThreads[0], ioThreads[1], connectThread, channel, null, null, address, connLivingCounter, connClosedCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
|
Objects.requireNonNull(address);
|
||||||
|
AsyncNioTcpConnection conn;
|
||||||
|
try {
|
||||||
|
conn = newTCPClientConnection(address);
|
||||||
|
} catch (IOException e) {
|
||||||
|
return CompletableFuture.failedFuture(e);
|
||||||
}
|
}
|
||||||
final AsyncNioTcpConnection conn = new AsyncNioTcpConnection(true, this, ioThread, connectThread, channel, null, null, address, connLivingCounter, connClosedCounter);
|
|
||||||
final CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
|
final CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
|
||||||
conn.connect(address, null, new CompletionHandler<Void, Void>() {
|
conn.connect(address, null, new CompletionHandler<Void, Void>() {
|
||||||
@Override
|
@Override
|
||||||
@@ -261,30 +293,41 @@ public class AsyncIOGroup extends AsyncGroup {
|
|||||||
return Utility.orTimeout(future, 30, TimeUnit.SECONDS);
|
return Utility.orTimeout(future, 30, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
//创建一个AsyncConnection对象,只给测试代码使用
|
||||||
public CompletableFuture<AsyncConnection> createUDPClient(final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
public AsyncConnection newUDPClientConnection() {
|
||||||
DatagramChannel channel;
|
|
||||||
try {
|
try {
|
||||||
channel = DatagramChannel.open();
|
return newUDPClientConnection(null);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
CompletableFuture future = new CompletableFuture();
|
throw new RuntimeException(e);
|
||||||
future.completeExceptionally(e);
|
|
||||||
return future;
|
|
||||||
}
|
}
|
||||||
AsyncIOThread ioThread = null;
|
}
|
||||||
|
|
||||||
|
private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException {
|
||||||
|
DatagramChannel channel = DatagramChannel.open();
|
||||||
|
AsyncIOThread[] ioThreads = null;
|
||||||
Thread currThread = Thread.currentThread();
|
Thread currThread = Thread.currentThread();
|
||||||
if (currThread instanceof AsyncIOThread) {
|
if (currThread instanceof AsyncIOThread) {
|
||||||
for (AsyncIOThread thread : ioThreads) {
|
for (int i = 0; i < this.ioReadThreads.length; i++) {
|
||||||
if (thread == currThread) {
|
if (this.ioReadThreads[i] == currThread || this.ioWriteThreads[i] == currThread) {
|
||||||
ioThread = thread;
|
ioThreads = new AsyncIOThread[]{this.ioReadThreads[i], this.ioWriteThreads[i]};
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ioThread == null) {
|
if (ioThreads == null) {
|
||||||
ioThread = nextIOThread();
|
ioThreads = nextIOThreads();
|
||||||
|
}
|
||||||
|
return new AsyncNioUdpConnection(true, this, ioThreads[0], ioThreads[1], connectThread, channel, null, null, address, connLivingCounter, connClosedCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<AsyncConnection> createUDPClient(final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
|
AsyncNioUdpConnection conn;
|
||||||
|
try {
|
||||||
|
conn = newUDPClientConnection(address);
|
||||||
|
} catch (IOException e) {
|
||||||
|
return CompletableFuture.failedFuture(e);
|
||||||
}
|
}
|
||||||
AsyncNioUdpConnection conn = new AsyncNioUdpConnection(true, this, ioThread, connectThread, channel, null, null, address, connLivingCounter, connClosedCounter);
|
|
||||||
CompletableFuture future = new CompletableFuture();
|
CompletableFuture future = new CompletableFuture();
|
||||||
conn.connect(address, null, new CompletionHandler<Void, Void>() {
|
conn.connect(address, null, new CompletionHandler<Void, Void>() {
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
* @param command
|
* @param command
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public final void execute(Runnable command) {
|
public void execute(Runnable command) {
|
||||||
commandQueue.offer(command);
|
commandQueue.offer(command);
|
||||||
selector.wakeup();
|
selector.wakeup();
|
||||||
}
|
}
|
||||||
@@ -84,7 +84,7 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
* @param commands
|
* @param commands
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public final void execute(Runnable... commands) {
|
public void execute(Runnable... commands) {
|
||||||
for (Runnable command : commands) {
|
for (Runnable command : commands) {
|
||||||
commandQueue.offer(command);
|
commandQueue.offer(command);
|
||||||
}
|
}
|
||||||
@@ -97,7 +97,7 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
* @param commands
|
* @param commands
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public final void execute(Collection<Runnable> commands) {
|
public void execute(Collection<Runnable> commands) {
|
||||||
if (commands != null) {
|
if (commands != null) {
|
||||||
for (Runnable command : commands) {
|
for (Runnable command : commands) {
|
||||||
commandQueue.offer(command);
|
commandQueue.offer(command);
|
||||||
|
|||||||
@@ -12,9 +12,9 @@ import java.nio.channels.*;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.function.*;
|
import java.util.function.Consumer;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.ByteBufferWriter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@@ -99,15 +99,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
|
|
||||||
protected SelectionKey writeKey;
|
protected SelectionKey writeKey;
|
||||||
|
|
||||||
public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread,
|
public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread,
|
||||||
final int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, SSLBuilder sslBuilder, SSLContext sslContext, LongAdder livingCounter, LongAdder closedCounter) {
|
final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext, LongAdder livingCounter, LongAdder closedCounter) {
|
||||||
super(client, ioGroup, ioThread, bufferCapacity, bufferPool, sslBuilder, sslContext, livingCounter, closedCounter);
|
super(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter);
|
||||||
this.connectThread = connectThread;
|
|
||||||
}
|
|
||||||
|
|
||||||
public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread,
|
|
||||||
final int bufferCapacity, Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, LongAdder livingCounter, LongAdder closedCounter) {
|
|
||||||
super(client, ioGroup, ioThread, bufferCapacity, bufferSupplier, bufferConsumer, sslBuilder, sslContext, livingCounter, closedCounter);
|
|
||||||
this.connectThread = connectThread;
|
this.connectThread = connectThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,9 +27,9 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
|
|||||||
|
|
||||||
private final SocketChannel channel;
|
private final SocketChannel channel;
|
||||||
|
|
||||||
public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread,
|
public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread,
|
||||||
SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) {
|
SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) {
|
||||||
super(client, ioGroup, ioThread, connectThread, ioGroup.bufferCapacity, ioThread.getBufferSupplier(), ioThread.getBufferConsumer(), sslBuilder, sslContext, livingCounter, closedCounter);
|
super(client, ioGroup, ioReadThread, ioWriteThread, connectThread, ioGroup.bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter);
|
||||||
this.channel = ch;
|
this.channel = ch;
|
||||||
SocketAddress addr = addr0;
|
SocketAddress addr = addr0;
|
||||||
if (addr == null) {
|
if (addr == null) {
|
||||||
@@ -40,7 +40,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.remoteAddress = addr;
|
this.remoteAddress = addr;
|
||||||
ioThread.connCounter.incrementAndGet();
|
ioReadThread.connCounter.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
|
|||||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||||
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
|
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
|
||||||
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
|
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
|
||||||
AsyncIOThread readThread = ioGroup.nextIOThread();
|
AsyncIOThread[] ioThreads = ioGroup.nextIOThreads();
|
||||||
LongAdder connCreateCounter = ioGroup.connCreateCounter;
|
LongAdder connCreateCounter = ioGroup.connCreateCounter;
|
||||||
if (connCreateCounter != null) {
|
if (connCreateCounter != null) {
|
||||||
connCreateCounter.increment();
|
connCreateCounter.increment();
|
||||||
@@ -170,7 +170,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
|
|||||||
if (connLivingCounter != null) {
|
if (connLivingCounter != null) {
|
||||||
connLivingCounter.increment();
|
connLivingCounter.increment();
|
||||||
}
|
}
|
||||||
AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, readThread, ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter);
|
AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioThreads[0], ioThreads[1], ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter);
|
||||||
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
|
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
|
||||||
conn.protocolCodec = codec;
|
conn.protocolCodec = codec;
|
||||||
if (conn.sslEngine == null) {
|
if (conn.sslEngine == null) {
|
||||||
|
|||||||
@@ -24,9 +24,9 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
|
|||||||
|
|
||||||
private final DatagramChannel channel;
|
private final DatagramChannel channel;
|
||||||
|
|
||||||
public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread, DatagramChannel ch,
|
public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, DatagramChannel ch,
|
||||||
SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) {
|
SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) {
|
||||||
super(client, ioGroup, ioThread, connectThread, ioGroup.bufferCapacity, ioThread.getBufferSupplier(), ioThread.getBufferConsumer(), sslBuilder, sslContext, livingCounter, closedCounter);
|
super(client, ioGroup, ioReadThread, ioWriteThread, connectThread, ioGroup.bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter);
|
||||||
this.channel = ch;
|
this.channel = ch;
|
||||||
SocketAddress addr = addr0;
|
SocketAddress addr = addr0;
|
||||||
if (addr == null) {
|
if (addr == null) {
|
||||||
|
|||||||
@@ -137,7 +137,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void accept(SocketAddress address, ByteBuffer buffer) throws IOException {
|
private void accept(SocketAddress address, ByteBuffer buffer) throws IOException {
|
||||||
AsyncIOThread readThread = ioGroup.nextIOThread();
|
AsyncIOThread[] ioThreads = ioGroup.nextIOThreads();
|
||||||
LongAdder connCreateCounter = ioGroup.connCreateCounter;
|
LongAdder connCreateCounter = ioGroup.connCreateCounter;
|
||||||
if (connCreateCounter != null) {
|
if (connCreateCounter != null) {
|
||||||
connCreateCounter.increment();
|
connCreateCounter.increment();
|
||||||
@@ -146,7 +146,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
|
|||||||
if (connLivingCounter != null) {
|
if (connLivingCounter != null) {
|
||||||
connLivingCounter.increment();
|
connLivingCounter.increment();
|
||||||
}
|
}
|
||||||
AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, readThread, ioGroup.connectThread(), this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address, connLivingCounter, ioGroup.connClosedCounter);
|
AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioThreads[0], ioThreads[1], ioGroup.connectThread(), this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address, connLivingCounter, ioGroup.connClosedCounter);
|
||||||
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
|
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
|
||||||
conn.protocolCodec = codec;
|
conn.protocolCodec = codec;
|
||||||
if (conn.sslEngine == null) {
|
if (conn.sslEngine == null) {
|
||||||
|
|||||||
Reference in New Issue
Block a user