This commit is contained in:
redkale
2024-09-11 22:15:52 +08:00
parent ff1d392196
commit 2c39167025
5 changed files with 88 additions and 42 deletions

View File

@@ -33,46 +33,67 @@ public abstract class AsyncGroup {
} }
public CompletableFuture<AsyncConnection> createTCPClientConnection(final SocketAddress address) { public CompletableFuture<AsyncConnection> createTCPClientConnection(final SocketAddress address) {
return AsyncGroup.this.createTCPClientConnection(address, 0); return createTCPClientConnection(-1, address, 0);
}
public CompletableFuture<AsyncConnection> createTCPClientConnection(int ioIndex, SocketAddress address) {
return createTCPClientConnection(ioIndex, address, 0);
}
public CompletableFuture<AsyncConnection> createTCPClientConnection(
SocketAddress address, int connectTimeoutSeconds) {
return createTCPClientConnection(-1, address, connectTimeoutSeconds);
} }
/** /**
* 创建TCP连接 * 创建TCP连接
* *
* @see org.redkale.net.AsyncIOGroup#createTCPClientConnection(java.net.SocketAddress, int) * @see org.redkale.net.AsyncIOGroup#createTCPClientConnection(int, java.net.SocketAddress, int)
* *
* @param ioIndex IO线程的下坐标
* @param address 地址 * @param address 地址
* @param connectTimeoutSeconds 连接超时 * @param connectTimeoutSeconds 连接超时
* @return AsyncConnection * @return AsyncConnection
*/ */
public abstract CompletableFuture<AsyncConnection> createTCPClientConnection( public abstract CompletableFuture<AsyncConnection> createTCPClientConnection(
SocketAddress address, int connectTimeoutSeconds); int ioIndex, SocketAddress address, int connectTimeoutSeconds);
public CompletableFuture<AsyncConnection> createUDPClientConnection(final SocketAddress address) {
return AsyncGroup.this.createUDPClientConnection(address, 0);
}
/** /**
* 创建UDP连接 * 创建UDP连接
* *
* @see org.redkale.net.AsyncIOGroup#createUDPClientConnection(java.net.SocketAddress, int) * @see org.redkale.net.AsyncIOGroup#createUDPClientConnection(int, java.net.SocketAddress, int)
* *
* @param ioIndex IO线程的下坐标
* @param address 地址 * @param address 地址
* @param connectTimeoutSeconds 连接超时 * @param connectTimeoutSeconds 连接超时
* @return AsyncConnection * @return AsyncConnection
*/ */
public abstract CompletableFuture<AsyncConnection> createUDPClientConnection( public abstract CompletableFuture<AsyncConnection> createUDPClientConnection(
SocketAddress address, int connectTimeoutSeconds); int ioIndex, SocketAddress address, int connectTimeoutSeconds);
public CompletableFuture<AsyncConnection> createClientConnection(final boolean tcp, final SocketAddress address) { public CompletableFuture<AsyncConnection> createUDPClientConnection(final SocketAddress address) {
return tcp ? createTCPClientConnection(address) : createUDPClientConnection(address); return createUDPClientConnection(-1, address, 0);
}
public CompletableFuture<AsyncConnection> createUDPClientConnection(int ioIndex, SocketAddress address) {
return createUDPClientConnection(ioIndex, address, 0);
}
public CompletableFuture<AsyncConnection> createUDPClientConnection(
SocketAddress address, int connectTimeoutSeconds) {
return createUDPClientConnection(-1, address, connectTimeoutSeconds);
} }
public CompletableFuture<AsyncConnection> createClientConnection( public CompletableFuture<AsyncConnection> createClientConnection(
boolean tcp, SocketAddress address, int connectTimeoutSeconds) { final boolean tcp, int ioIndex, final SocketAddress address) {
return tcp ? createTCPClientConnection(ioIndex, address) : createUDPClientConnection(ioIndex, address);
}
public CompletableFuture<AsyncConnection> createClientConnection(
boolean tcp, int ioIndex, SocketAddress address, int connectTimeoutSeconds) {
return tcp return tcp
? AsyncGroup.this.createTCPClientConnection(address, connectTimeoutSeconds) ? createTCPClientConnection(ioIndex, address, connectTimeoutSeconds)
: createUDPClientConnection(address, connectTimeoutSeconds); : createUDPClientConnection(ioIndex, address, connectTimeoutSeconds);
} }
/** /**

View File

@@ -202,13 +202,13 @@ public class AsyncIOGroup extends AsyncGroup {
// 创建一个AsyncConnection对象只给测试代码使用 // 创建一个AsyncConnection对象只给测试代码使用
public AsyncConnection newTCPClientConnection() { public AsyncConnection newTCPClientConnection() {
try { try {
return newTCPClientConnection(null); return newTCPClientConnection(-1, null);
} catch (IOException e) { } catch (IOException e) {
throw new RedkaleException(e); throw new RedkaleException(e);
} }
} }
private AsyncNioTcpConnection newTCPClientConnection(final SocketAddress address) throws IOException { private AsyncNioTcpConnection newTCPClientConnection(int ioIndex, SocketAddress address) throws IOException {
SocketChannel channel = SocketChannel.open(); SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false); channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true); channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
@@ -218,7 +218,10 @@ public class AsyncIOGroup extends AsyncGroup {
AsyncIOThread readThread = null; AsyncIOThread readThread = null;
AsyncIOThread writeThread = null; AsyncIOThread writeThread = null;
AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread(); AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread();
if (currThread != null) { if (ioIndex >= 0 && ioIndex < this.ioReadThreads.length) {
readThread = this.ioReadThreads[ioIndex];
writeThread = this.ioWriteThreads[ioIndex];
} else if (currThread != null) {
if (this.ioReadThreads[0].getThreadGroup() == currThread.getThreadGroup()) { if (this.ioReadThreads[0].getThreadGroup() == currThread.getThreadGroup()) {
for (AsyncIOThread ioReadThread : this.ioReadThreads) { for (AsyncIOThread ioReadThread : this.ioReadThreads) {
if (ioReadThread.index() == currThread.index()) { if (ioReadThread.index() == currThread.index()) {
@@ -246,11 +249,12 @@ public class AsyncIOGroup extends AsyncGroup {
} }
@Override @Override
public CompletableFuture<AsyncConnection> createTCPClientConnection(SocketAddress address, int connectTimeoutSeconds) { public CompletableFuture<AsyncConnection> createTCPClientConnection(
int ioIndex, SocketAddress address, int connectTimeoutSeconds) {
Objects.requireNonNull(address); Objects.requireNonNull(address);
AsyncNioTcpConnection conn; AsyncNioTcpConnection conn;
try { try {
conn = newTCPClientConnection(address); conn = newTCPClientConnection(ioIndex, address);
} catch (IOException e) { } catch (IOException e) {
return CompletableFuture.failedFuture(e); return CompletableFuture.failedFuture(e);
} }
@@ -286,19 +290,22 @@ public class AsyncIOGroup extends AsyncGroup {
// 创建一个AsyncConnection对象只给测试代码使用 // 创建一个AsyncConnection对象只给测试代码使用
public AsyncConnection newUDPClientConnection() { public AsyncConnection newUDPClientConnection() {
try { try {
return newUDPClientConnection(null); return newUDPClientConnection(-1, null);
} catch (IOException e) { } catch (IOException e) {
throw new RedkaleException(e); throw new RedkaleException(e);
} }
} }
private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException { private AsyncNioUdpConnection newUDPClientConnection(int ioIndex, SocketAddress address) throws IOException {
DatagramChannel channel = DatagramChannel.open(); DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false); channel.configureBlocking(false);
AsyncIOThread readThread = null; AsyncIOThread readThread = null;
AsyncIOThread writeThread = null; AsyncIOThread writeThread = null;
AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread(); AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread();
if (currThread != null) { if (ioIndex >= 0 && ioIndex < this.ioReadThreads.length) {
readThread = this.ioReadThreads[ioIndex];
writeThread = this.ioWriteThreads[ioIndex];
} else if (currThread != null) {
for (AsyncIOThread ioReadThread : this.ioReadThreads) { for (AsyncIOThread ioReadThread : this.ioReadThreads) {
if (ioReadThread.index() == currThread.index()) { if (ioReadThread.index() == currThread.index()) {
readThread = ioReadThread; readThread = ioReadThread;
@@ -322,10 +329,11 @@ public class AsyncIOGroup extends AsyncGroup {
} }
@Override @Override
public CompletableFuture<AsyncConnection> createUDPClientConnection(SocketAddress address, int connectTimeoutSeconds) { public CompletableFuture<AsyncConnection> createUDPClientConnection(
int ioIndex, SocketAddress address, int connectTimeoutSeconds) {
AsyncNioUdpConnection conn; AsyncNioUdpConnection conn;
try { try {
conn = newUDPClientConnection(address); conn = newUDPClientConnection(ioIndex, address);
} catch (IOException e) { } catch (IOException e) {
return CompletableFuture.failedFuture(e); return CompletableFuture.failedFuture(e);
} }

View File

@@ -308,46 +308,51 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final CompletableFuture<C> newConnection() { public final CompletableFuture<C> newConnection() {
return connect(getAddress(null), WorkThread.currentWorkThread(), false); return connect(getAddress(null), WorkThread.currentWorkThread(), -1, false);
} }
// 指定地址获取连接 // 指定地址获取连接
public final CompletableFuture<C> newConnection(final SocketAddress addr) { public final CompletableFuture<C> newConnection(final SocketAddress addr) {
return connect(addr, WorkThread.currentWorkThread(), false); return connect(addr, WorkThread.currentWorkThread(), -1, false);
} }
public final CompletableFuture<C> connect() { public final CompletableFuture<C> connect() {
return connect(getAddress(null), WorkThread.currentWorkThread(), true); return connect(getAddress(null), WorkThread.currentWorkThread(), -1, true);
}
public final CompletableFuture<C> connect(int excludeIndex) {
return connect(getAddress(null), excludeIndex > -1 ? null : WorkThread.currentWorkThread(), excludeIndex, true);
} }
protected CompletableFuture<C> connect(R request) { protected CompletableFuture<C> connect(R request) {
return connect(getAddress(request), request.workThread, true); return connect(getAddress(request), request.workThread, -1, true);
} }
// 指定地址获取连接 // 指定地址获取连接
public final CompletableFuture<C> connect(final SocketAddress addr) { public final CompletableFuture<C> connect(final SocketAddress addr) {
return connect(addr, WorkThread.currentWorkThread(), true); return connect(addr, WorkThread.currentWorkThread(), -1, true);
} }
// 指定地址获取连接 // 指定地址获取连接
protected CompletableFuture<C> connect(WorkThread workThread, final SocketAddress addr) { protected CompletableFuture<C> connect(WorkThread workThread, final SocketAddress addr) {
return connect(addr, workThread, true); return connect(addr, workThread, -1, true);
} }
// 指定地址获取连接 // 指定地址获取连接
private CompletableFuture<C> connect(@Nonnull SocketAddress addr, @Nullable WorkThread workThread, boolean pool) { private CompletableFuture<C> connect(
@Nonnull SocketAddress addr, @Nullable WorkThread workThread, int excludeIndex, boolean pool) {
if (addr == null) { if (addr == null) {
return CompletableFuture.failedFuture(new NullPointerException("address is empty")); return CompletableFuture.failedFuture(new NullPointerException("address is empty"));
} }
final String traceid = Traces.currentTraceid(); final String traceid = Traces.currentTraceid();
final AddressConnEntry<C> entry = getAddressConnEntry(addr, workThread); final AddressConnEntry<C> entry = getAddressConnEntry(addr, workThread, excludeIndex);
C ec = entry.connection; C ec = entry.connection;
if (pool && ec != null && ec.isOpen()) { if (pool && ec != null && ec.isOpen()) {
return CompletableFuture.completedFuture(ec); return CompletableFuture.completedFuture(ec);
} }
final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings; final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings;
if (!pool || entry.connOpenState.compareAndSet(false, true)) { if (!pool || entry.connOpenState.compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClientConnection(tcp, addr, connectTimeoutSeconds) CompletableFuture<C> future = group.createClientConnection(tcp, entry.index, addr, connectTimeoutSeconds)
.thenApply(c -> .thenApply(c ->
(C) createClientConnection(c).setConnEntry(entry).setMaxPipelines(maxPipelines)); (C) createClientConnection(c).setConnEntry(entry).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
@@ -405,11 +410,11 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
} }
private AddressConnEntry<C> getAddressConnEntry(SocketAddress addr, WorkThread workThread) { private AddressConnEntry<C> getAddressConnEntry(SocketAddress addr, WorkThread workThread, int excludeIndex) {
final AddressConnEntry<C>[] entrys = connAddrEntrys.computeIfAbsent(addr, a -> { final AddressConnEntry<C>[] entrys = connAddrEntrys.computeIfAbsent(addr, a -> {
AddressConnEntry<C>[] array = new AddressConnEntry[connLimit]; AddressConnEntry<C>[] array = new AddressConnEntry[connLimit];
for (int i = 0; i < array.length; i++) { for (int i = 0; i < array.length; i++) {
array[i] = new AddressConnEntry<>(); array[i] = new AddressConnEntry<>(i);
} }
return array; return array;
}); });
@@ -417,11 +422,17 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return entrys[workThread.index()]; return entrys[workThread.index()];
} }
int index = workThread == null || workThread.index() < 0 int index = workThread == null || workThread.index() < 0
? random.nextInt(entrys.length) ? randomIndex(entrys.length, excludeIndex)
: workThread.index() % entrys.length; : workThread.index() % entrys.length;
return entrys[index]; return entrys[index];
} }
private int randomIndex(int len, int excludeIndex) {
if (len == 1) return 0;
int i = random.nextInt(len);
return i != excludeIndex ? i : ((i + 1) < len ? (i + 1) : (i - 1));
}
protected ClientFuture<R, P> createClientFuture(ClientConnection conn, R request) { protected ClientFuture<R, P> createClientFuture(ClientConnection conn, R request) {
ClientFuture respFuture = new ClientFuture(conn, request); ClientFuture respFuture = new ClientFuture(conn, request);
int rts = getReadTimeoutSeconds(); int rts = getReadTimeoutSeconds();
@@ -472,12 +483,16 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
public C connection; public C connection;
public final int index;
public final LongAdder connRespWaiting = new LongAdder(); public final LongAdder connRespWaiting = new LongAdder();
public final AtomicBoolean connOpenState = new AtomicBoolean(); public final AtomicBoolean connOpenState = new AtomicBoolean();
public final Queue<CompletableFuture<C>> connAcquireWaitings = new ConcurrentLinkedDeque(); public final Queue<CompletableFuture<C>> connAcquireWaitings = new ConcurrentLinkedDeque();
AddressConnEntry() {} AddressConnEntry(int index) {
this.index = index;
}
} }
} }

View File

@@ -80,7 +80,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource
}; };
protected final BiFunction<DataSource, EntityInfo, CompletableFuture<List>> fullloader = (s, i) -> protected final BiFunction<DataSource, EntityInfo, CompletableFuture<List>> fullloader = (s, i) ->
((CompletableFuture<Sheet>) querySheetDBAsync(i, false, false, false, null, null, (FilterNode) null)) ((CompletableFuture<Sheet>) querySheetDBAsync(i, false, false, false, null, null, (FilterNode) null, true))
.thenApply(e -> e == null ? new ArrayList() : e.list(true)); .thenApply(e -> e == null ? new ArrayList() : e.list(true));
protected final IntFunction<String> signFunc = this::prepareParamSign; protected final IntFunction<String> signFunc = this::prepareParamSign;
@@ -1075,7 +1075,8 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource
final boolean distinct, final boolean distinct,
final SelectColumn selects, final SelectColumn selects,
final Flipper flipper, final Flipper flipper,
final FilterNode node); final FilterNode node,
final boolean inCacheLoad);
// 插入纪录 // 插入纪录
protected <T> int insertDB(final EntityInfo<T> info, T... entitys) { protected <T> int insertDB(final EntityInfo<T> info, T... entitys) {
@@ -1213,7 +1214,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource
final SelectColumn selects, final SelectColumn selects,
final Flipper flipper, final Flipper flipper,
final FilterNode node) { final FilterNode node) {
return querySheetDBAsync(info, readcache, needtotal, distinct, selects, flipper, node) return querySheetDBAsync(info, readcache, needtotal, distinct, selects, flipper, node, false)
.join(); .join();
} }
@@ -3891,7 +3892,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource
return CompletableFuture.completedFuture(cache.querySheet(needTotal, distinct, selects, flipper, node)); return CompletableFuture.completedFuture(cache.querySheet(needTotal, distinct, selects, flipper, node));
} }
} }
return querySheetDBAsync(info, readCache, needTotal, distinct, selects, flipper, node); return querySheetDBAsync(info, readCache, needTotal, distinct, selects, flipper, node, false);
} }
// -------------------------------------------- native SQL -------------------------------------------- // -------------------------------------------- native SQL --------------------------------------------

View File

@@ -2329,7 +2329,8 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final boolean distinct, final boolean distinct,
SelectColumn selects, SelectColumn selects,
Flipper flipper, Flipper flipper,
FilterNode node) { FilterNode node,
final boolean inCacheLoad) {
return supplyAsync(() -> querySheetDB(info, readCache, needTotal, distinct, selects, flipper, node)); return supplyAsync(() -> querySheetDB(info, readCache, needTotal, distinct, selects, flipper, node));
} }