From 2c391670255029b9f8861a76ffed9b16b0f99e05 Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 11 Sep 2024 22:15:52 +0800 Subject: [PATCH] Client --- src/main/java/org/redkale/net/AsyncGroup.java | 49 +++++++++++++------ .../java/org/redkale/net/AsyncIOGroup.java | 28 +++++++---- .../java/org/redkale/net/client/Client.java | 41 +++++++++++----- .../redkale/source/AbstractDataSqlSource.java | 9 ++-- .../org/redkale/source/DataJdbcSource.java | 3 +- 5 files changed, 88 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index 53d6f6231..576009285 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -33,46 +33,67 @@ public abstract class AsyncGroup { } public CompletableFuture createTCPClientConnection(final SocketAddress address) { - return AsyncGroup.this.createTCPClientConnection(address, 0); + return createTCPClientConnection(-1, address, 0); + } + + public CompletableFuture createTCPClientConnection(int ioIndex, SocketAddress address) { + return createTCPClientConnection(ioIndex, address, 0); + } + + public CompletableFuture createTCPClientConnection( + SocketAddress address, int connectTimeoutSeconds) { + return createTCPClientConnection(-1, address, connectTimeoutSeconds); } /** * 创建TCP连接 * - * @see org.redkale.net.AsyncIOGroup#createTCPClientConnection(java.net.SocketAddress, int) + * @see org.redkale.net.AsyncIOGroup#createTCPClientConnection(int, java.net.SocketAddress, int) * + * @param ioIndex IO线程的下坐标 * @param address 地址 * @param connectTimeoutSeconds 连接超时 * @return AsyncConnection */ public abstract CompletableFuture createTCPClientConnection( - SocketAddress address, int connectTimeoutSeconds); - - public CompletableFuture createUDPClientConnection(final SocketAddress address) { - return AsyncGroup.this.createUDPClientConnection(address, 0); - } + int ioIndex, SocketAddress address, int connectTimeoutSeconds); /** * 创建UDP连接 * - * @see org.redkale.net.AsyncIOGroup#createUDPClientConnection(java.net.SocketAddress, int) + * @see org.redkale.net.AsyncIOGroup#createUDPClientConnection(int, java.net.SocketAddress, int) * + * @param ioIndex IO线程的下坐标 * @param address 地址 * @param connectTimeoutSeconds 连接超时 * @return AsyncConnection */ public abstract CompletableFuture createUDPClientConnection( - SocketAddress address, int connectTimeoutSeconds); + int ioIndex, SocketAddress address, int connectTimeoutSeconds); - public CompletableFuture createClientConnection(final boolean tcp, final SocketAddress address) { - return tcp ? createTCPClientConnection(address) : createUDPClientConnection(address); + public CompletableFuture createUDPClientConnection(final SocketAddress address) { + return createUDPClientConnection(-1, address, 0); + } + + public CompletableFuture createUDPClientConnection(int ioIndex, SocketAddress address) { + return createUDPClientConnection(ioIndex, address, 0); + } + + public CompletableFuture createUDPClientConnection( + SocketAddress address, int connectTimeoutSeconds) { + return createUDPClientConnection(-1, address, connectTimeoutSeconds); } public CompletableFuture createClientConnection( - boolean tcp, SocketAddress address, int connectTimeoutSeconds) { + final boolean tcp, int ioIndex, final SocketAddress address) { + return tcp ? createTCPClientConnection(ioIndex, address) : createUDPClientConnection(ioIndex, address); + } + + public CompletableFuture createClientConnection( + boolean tcp, int ioIndex, SocketAddress address, int connectTimeoutSeconds) { return tcp - ? AsyncGroup.this.createTCPClientConnection(address, connectTimeoutSeconds) - : createUDPClientConnection(address, connectTimeoutSeconds); + ? createTCPClientConnection(ioIndex, address, connectTimeoutSeconds) + : createUDPClientConnection(ioIndex, address, connectTimeoutSeconds); } /** diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index d2f4c82f3..9101e6bce 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -202,13 +202,13 @@ public class AsyncIOGroup extends AsyncGroup { // 创建一个AsyncConnection对象,只给测试代码使用 public AsyncConnection newTCPClientConnection() { try { - return newTCPClientConnection(null); + return newTCPClientConnection(-1, null); } catch (IOException e) { throw new RedkaleException(e); } } - private AsyncNioTcpConnection newTCPClientConnection(final SocketAddress address) throws IOException { + private AsyncNioTcpConnection newTCPClientConnection(int ioIndex, SocketAddress address) throws IOException { SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); @@ -218,7 +218,10 @@ public class AsyncIOGroup extends AsyncGroup { AsyncIOThread readThread = null; AsyncIOThread writeThread = null; AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread(); - if (currThread != null) { + if (ioIndex >= 0 && ioIndex < this.ioReadThreads.length) { + readThread = this.ioReadThreads[ioIndex]; + writeThread = this.ioWriteThreads[ioIndex]; + } else if (currThread != null) { if (this.ioReadThreads[0].getThreadGroup() == currThread.getThreadGroup()) { for (AsyncIOThread ioReadThread : this.ioReadThreads) { if (ioReadThread.index() == currThread.index()) { @@ -246,11 +249,12 @@ public class AsyncIOGroup extends AsyncGroup { } @Override - public CompletableFuture createTCPClientConnection(SocketAddress address, int connectTimeoutSeconds) { + public CompletableFuture createTCPClientConnection( + int ioIndex, SocketAddress address, int connectTimeoutSeconds) { Objects.requireNonNull(address); AsyncNioTcpConnection conn; try { - conn = newTCPClientConnection(address); + conn = newTCPClientConnection(ioIndex, address); } catch (IOException e) { return CompletableFuture.failedFuture(e); } @@ -286,19 +290,22 @@ public class AsyncIOGroup extends AsyncGroup { // 创建一个AsyncConnection对象,只给测试代码使用 public AsyncConnection newUDPClientConnection() { try { - return newUDPClientConnection(null); + return newUDPClientConnection(-1, null); } catch (IOException e) { throw new RedkaleException(e); } } - private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException { + private AsyncNioUdpConnection newUDPClientConnection(int ioIndex, SocketAddress address) throws IOException { DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(false); AsyncIOThread readThread = null; AsyncIOThread writeThread = null; AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread(); - if (currThread != null) { + if (ioIndex >= 0 && ioIndex < this.ioReadThreads.length) { + readThread = this.ioReadThreads[ioIndex]; + writeThread = this.ioWriteThreads[ioIndex]; + } else if (currThread != null) { for (AsyncIOThread ioReadThread : this.ioReadThreads) { if (ioReadThread.index() == currThread.index()) { readThread = ioReadThread; @@ -322,10 +329,11 @@ public class AsyncIOGroup extends AsyncGroup { } @Override - public CompletableFuture createUDPClientConnection(SocketAddress address, int connectTimeoutSeconds) { + public CompletableFuture createUDPClientConnection( + int ioIndex, SocketAddress address, int connectTimeoutSeconds) { AsyncNioUdpConnection conn; try { - conn = newUDPClientConnection(address); + conn = newUDPClientConnection(ioIndex, address); } catch (IOException e) { return CompletableFuture.failedFuture(e); } diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index f4d571a2f..844d0a938 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -308,46 +308,51 @@ public abstract class Client, R extends ClientR } public final CompletableFuture newConnection() { - return connect(getAddress(null), WorkThread.currentWorkThread(), false); + return connect(getAddress(null), WorkThread.currentWorkThread(), -1, false); } // 指定地址获取连接 public final CompletableFuture newConnection(final SocketAddress addr) { - return connect(addr, WorkThread.currentWorkThread(), false); + return connect(addr, WorkThread.currentWorkThread(), -1, false); } public final CompletableFuture connect() { - return connect(getAddress(null), WorkThread.currentWorkThread(), true); + return connect(getAddress(null), WorkThread.currentWorkThread(), -1, true); + } + + public final CompletableFuture connect(int excludeIndex) { + return connect(getAddress(null), excludeIndex > -1 ? null : WorkThread.currentWorkThread(), excludeIndex, true); } protected CompletableFuture connect(R request) { - return connect(getAddress(request), request.workThread, true); + return connect(getAddress(request), request.workThread, -1, true); } // 指定地址获取连接 public final CompletableFuture connect(final SocketAddress addr) { - return connect(addr, WorkThread.currentWorkThread(), true); + return connect(addr, WorkThread.currentWorkThread(), -1, true); } // 指定地址获取连接 protected CompletableFuture connect(WorkThread workThread, final SocketAddress addr) { - return connect(addr, workThread, true); + return connect(addr, workThread, -1, true); } // 指定地址获取连接 - private CompletableFuture connect(@Nonnull SocketAddress addr, @Nullable WorkThread workThread, boolean pool) { + private CompletableFuture connect( + @Nonnull SocketAddress addr, @Nullable WorkThread workThread, int excludeIndex, boolean pool) { if (addr == null) { return CompletableFuture.failedFuture(new NullPointerException("address is empty")); } final String traceid = Traces.currentTraceid(); - final AddressConnEntry entry = getAddressConnEntry(addr, workThread); + final AddressConnEntry entry = getAddressConnEntry(addr, workThread, excludeIndex); C ec = entry.connection; if (pool && ec != null && ec.isOpen()) { return CompletableFuture.completedFuture(ec); } final Queue> waitQueue = entry.connAcquireWaitings; if (!pool || entry.connOpenState.compareAndSet(false, true)) { - CompletableFuture future = group.createClientConnection(tcp, addr, connectTimeoutSeconds) + CompletableFuture future = group.createClientConnection(tcp, entry.index, addr, connectTimeoutSeconds) .thenApply(c -> (C) createClientConnection(c).setConnEntry(entry).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); @@ -405,11 +410,11 @@ public abstract class Client, R extends ClientR } } - private AddressConnEntry getAddressConnEntry(SocketAddress addr, WorkThread workThread) { + private AddressConnEntry getAddressConnEntry(SocketAddress addr, WorkThread workThread, int excludeIndex) { final AddressConnEntry[] entrys = connAddrEntrys.computeIfAbsent(addr, a -> { AddressConnEntry[] array = new AddressConnEntry[connLimit]; for (int i = 0; i < array.length; i++) { - array[i] = new AddressConnEntry<>(); + array[i] = new AddressConnEntry<>(i); } return array; }); @@ -417,11 +422,17 @@ public abstract class Client, R extends ClientR return entrys[workThread.index()]; } int index = workThread == null || workThread.index() < 0 - ? random.nextInt(entrys.length) + ? randomIndex(entrys.length, excludeIndex) : workThread.index() % entrys.length; return entrys[index]; } + private int randomIndex(int len, int excludeIndex) { + if (len == 1) return 0; + int i = random.nextInt(len); + return i != excludeIndex ? i : ((i + 1) < len ? (i + 1) : (i - 1)); + } + protected ClientFuture createClientFuture(ClientConnection conn, R request) { ClientFuture respFuture = new ClientFuture(conn, request); int rts = getReadTimeoutSeconds(); @@ -472,12 +483,16 @@ public abstract class Client, R extends ClientR public C connection; + public final int index; + public final LongAdder connRespWaiting = new LongAdder(); public final AtomicBoolean connOpenState = new AtomicBoolean(); public final Queue> connAcquireWaitings = new ConcurrentLinkedDeque(); - AddressConnEntry() {} + AddressConnEntry(int index) { + this.index = index; + } } } diff --git a/src/main/java/org/redkale/source/AbstractDataSqlSource.java b/src/main/java/org/redkale/source/AbstractDataSqlSource.java index 740e37203..83371d13d 100644 --- a/src/main/java/org/redkale/source/AbstractDataSqlSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSqlSource.java @@ -80,7 +80,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource }; protected final BiFunction> fullloader = (s, i) -> - ((CompletableFuture) querySheetDBAsync(i, false, false, false, null, null, (FilterNode) null)) + ((CompletableFuture) querySheetDBAsync(i, false, false, false, null, null, (FilterNode) null, true)) .thenApply(e -> e == null ? new ArrayList() : e.list(true)); protected final IntFunction signFunc = this::prepareParamSign; @@ -1075,7 +1075,8 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource final boolean distinct, final SelectColumn selects, final Flipper flipper, - final FilterNode node); + final FilterNode node, + final boolean inCacheLoad); // 插入纪录 protected int insertDB(final EntityInfo info, T... entitys) { @@ -1213,7 +1214,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource final SelectColumn selects, final Flipper flipper, final FilterNode node) { - return querySheetDBAsync(info, readcache, needtotal, distinct, selects, flipper, node) + return querySheetDBAsync(info, readcache, needtotal, distinct, selects, flipper, node, false) .join(); } @@ -3891,7 +3892,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource return CompletableFuture.completedFuture(cache.querySheet(needTotal, distinct, selects, flipper, node)); } } - return querySheetDBAsync(info, readCache, needTotal, distinct, selects, flipper, node); + return querySheetDBAsync(info, readCache, needTotal, distinct, selects, flipper, node, false); } // -------------------------------------------- native SQL -------------------------------------------- diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 404510bd1..18c512eee 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -2329,7 +2329,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { final boolean distinct, SelectColumn selects, Flipper flipper, - FilterNode node) { + FilterNode node, + final boolean inCacheLoad) { return supplyAsync(() -> querySheetDB(info, readCache, needTotal, distinct, selects, flipper, node)); }