diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 1a6d8ae31..ba3624edb 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -257,66 +257,62 @@ public abstract class Client, R extends ClientR } } - public final CompletableFuture

sendAsync(R request) { - return sendAsync(getAddress(request), request, (Function) null); + protected CompletableFuture writeChannel(ClientConnection conn, Function respTransfer, R request) { + return conn.writeChannel(respTransfer, request); } - public final CompletableFuture sendAsync(R request, Function respTransfer) { - return sendAsync(getAddress(request), request, respTransfer); + public final CompletableFuture

sendAsync(R request) { + return sendAsync(getAddress(request), (Function) null, request); + } + + public final CompletableFuture> sendAsync(R... requests) { + return sendAsync(getAddress(requests[0]), (Function) null, requests); + } + + public final CompletableFuture sendAsync(Function respTransfer, R request) { + return sendAsync(getAddress(request), respTransfer, request); + } + + public final CompletableFuture> sendAsync(Function respTransfer, R... requests) { + return sendAsync(getAddress(requests[0]), respTransfer, requests); } public final CompletableFuture

sendAsync(SocketAddress addr, R request) { - return sendAsync(addr, request, (Function) null); + return sendAsync(addr, (Function) null, request); } - public final CompletableFuture sendAsync(SocketAddress addr, R request, Function respTransfer) { + public final CompletableFuture> sendAsync(SocketAddress addr, R... requests) { + return sendAsync(addr, (Function) null, requests); + } + + public final CompletableFuture sendAsync(SocketAddress addr, Function respTransfer, R request) { request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); request.computeWorkThreadIfAbsent(); - return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, request, respTransfer)); + return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, request)); + } + + public final CompletableFuture> sendAsync( + SocketAddress addr, Function respTransfer, R... requests) { + String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); + for (R request : requests) { + request.traceid = traceid; + request.computeWorkThreadIfAbsent(); + } + return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, requests)); } protected CompletableFuture

writeChannel(ClientConnection conn, R request) { return conn.writeChannel(request); } - protected CompletableFuture writeChannel(ClientConnection conn, R request, Function respTransfer) { - return conn.writeChannel(request, respTransfer); - } - - public final CompletableFuture> sendAsync(R[] requests) { - return sendAsync(getAddress(requests[0]), requests, (Function) null); - } - - public final CompletableFuture> sendAsync(R[] requests, Function respTransfer) { - return sendAsync(getAddress(requests[0]), requests, respTransfer); - } - - public final CompletableFuture> sendAsync(SocketAddress addr, R[] requests) { - return sendAsync(addr, requests, (Function) null); - } - - public final CompletableFuture> sendAsync( - SocketAddress addr, R[] requests, Function respTransfer) { - String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); - for (R request : requests) { - request.traceid = traceid; - request.computeWorkThreadIfAbsent(); - } - return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, requests, respTransfer)); - } - - protected CompletableFuture> writeChannelBatch(ClientConnection conn, R... requests) { + protected CompletableFuture> writeChannel(ClientConnection conn, R... requests) { requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); return conn.writeChannel(requests); } - protected CompletableFuture> writeChannel(ClientConnection conn, R[] requests) { - return conn.writeChannel(requests); - } - protected CompletableFuture> writeChannel( - ClientConnection conn, R[] requests, Function respTransfer) { - return conn.writeChannel(requests, respTransfer); + ClientConnection conn, Function respTransfer, R[] requests) { + return conn.writeChannel(respTransfer, requests); } // 根据请求获取地址 diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 651e016b4..1f06c13a9 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -125,12 +125,11 @@ public abstract class ClientCodec writeChannel(R request) { - return writeChannel(request, null); + return writeChannel((Function) null, request); } protected final CompletableFuture> writeChannel(R[] requests) { - return writeChannel(requests, null); + return writeChannel((Function) null, requests); + } + + protected final CompletableFuture writeChannel(Function respTransfer, R request) { + return writeChannel0(respTransfer, request).thenApply(v -> Utility.isEmpty(v) ? null : v.get(0)); } // respTransfer只会在ClientCodec的读线程里调用 - protected final CompletableFuture writeChannel(R request, Function respTransfer) { - request.respTransfer = respTransfer; - ClientFuture respFuture = createClientFuture(request); - if (client.debug) { - client.logger.log( - Level.FINEST, - Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + this + ", sendRequest: " - + request + ", respFuture: " + respFuture); - } - respWaitingCounter.increment(); // 放在writeChannelInWriteThread计数会延迟,导致不准确 - writeLock.lock(); - try { - offerRespFuture(respFuture); - if (pauseWriting.get()) { - pauseRequests.add(respFuture); - } else { - sendRequestInLocking(request, respFuture); - } - } finally { - writeLock.unlock(); - } - if (client.debug) { - return respFuture.whenComplete((v, t) -> { - client.logger.log( - Level.FINEST, - Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + this + ", respResult: " - + (t != null ? t : v)); - }); - } - return respFuture; - } - - protected void sendRequestInLocking(R request, ClientFuture respFuture) { - // 发送请求数据包 - writeArray.clear(); - request.writeTo(this, writeArray); - if (request.isCompleted()) { - doneRequestCounter.increment(); - } else { // 还剩半包没发送完 - pauseWriting.set(true); - currHalfWriteFuture = respFuture; - } - if (writeArray.length() > 0) { - if (writeBuffer.capacity() >= writeArray.length()) { - writeBuffer.clear(); - writeBuffer.put(writeArray.content(), 0, writeArray.length()); - writeBuffer.flip(); - channel.write(writeBuffer, this, writeHandler); - } else { - channel.write(writeArray, this, writeHandler); - } - } + protected final CompletableFuture> writeChannel(Function respTransfer, R... requests) { + return writeChannel0(respTransfer, requests); } // respTransfer只会在ClientCodec的读线程里调用 - protected final CompletableFuture> writeChannel(R[] requests, Function respTransfer) { + protected final CompletableFuture> writeChannel0(Function respTransfer, R... requests) { if (client.debug) { client.logger.log( Level.FINEST, @@ -209,7 +163,11 @@ public abstract class ClientConnection respFuture) { + protected void offerFirstRespFuture(ClientFuture respFuture) { Serializable requestid = respFuture.request.getRequestid(); if (requestid == null) { respFutureQueue.offerFirst(respFuture); @@ -367,7 +326,7 @@ public abstract class ClientConnection respFuture) { + protected void offerRespFuture(ClientFuture respFuture) { Serializable requestid = respFuture.request.getRequestid(); if (requestid == null) { respFutureQueue.offer(respFuture); @@ -377,7 +336,7 @@ public abstract class ClientConnection respFuture) { + protected void removeRespFuture(Serializable requestid, ClientFuture respFuture) { if (requestid == null) { respFutureQueue.remove(respFuture); } else { diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java index 8a368da87..1a95d9772 100644 --- a/src/main/java/org/redkale/util/Utility.java +++ b/src/main/java/org/redkale/util/Utility.java @@ -865,6 +865,13 @@ public final class Utility { } public static CompletableFuture> allOfFutures(CompletableFuture[] futures) { + if (futures.length == 1) { + return futures[0].thenApply(v -> { + List rs = new ArrayList<>(1); + rs.add(v); + return rs; + }); + } return CompletableFuture.allOf(futures).thenApply(v -> { int size = futures.length; List rs = new ArrayList<>(size); @@ -907,6 +914,14 @@ public final class Utility { public static CompletableFuture> allOfFutures( CompletableFuture[] futures, BiConsumer consumer) { + if (futures.length == 1) { + return futures[0].thenApply(v -> { + List rs = new ArrayList<>(1); + consumer.accept(0, v); + rs.add(v); + return rs; + }); + } return CompletableFuture.allOf(futures).thenApply(v -> { int size = futures.length; List rs = new ArrayList<>(size);