Client优化

This commit is contained in:
redkale
2024-08-16 19:31:31 +08:00
parent cf65c780c8
commit 5384e9ab96
4 changed files with 72 additions and 103 deletions

View File

@@ -257,66 +257,62 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
}
public final CompletableFuture<P> sendAsync(R request) {
return sendAsync(getAddress(request), request, (Function) null);
protected <T> CompletableFuture<T> writeChannel(ClientConnection conn, Function<P, T> respTransfer, R request) {
return conn.writeChannel(respTransfer, request);
}
public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
return sendAsync(getAddress(request), request, respTransfer);
public final CompletableFuture<P> sendAsync(R request) {
return sendAsync(getAddress(request), (Function) null, request);
}
public final CompletableFuture<List<P>> sendAsync(R... requests) {
return sendAsync(getAddress(requests[0]), (Function) null, requests);
}
public final <T> CompletableFuture<T> sendAsync(Function<P, T> respTransfer, R request) {
return sendAsync(getAddress(request), respTransfer, request);
}
public final <T> CompletableFuture<List<T>> sendAsync(Function<P, T> respTransfer, R... requests) {
return sendAsync(getAddress(requests[0]), respTransfer, requests);
}
public final CompletableFuture<P> sendAsync(SocketAddress addr, R request) {
return sendAsync(addr, request, (Function) null);
return sendAsync(addr, (Function) null, request);
}
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, Function<P, T> respTransfer) {
public final CompletableFuture<List<P>> sendAsync(SocketAddress addr, R... requests) {
return sendAsync(addr, (Function) null, requests);
}
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, Function<P, T> respTransfer, R request) {
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
request.computeWorkThreadIfAbsent();
return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, request, respTransfer));
return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, request));
}
public final <T> CompletableFuture<List<T>> sendAsync(
SocketAddress addr, Function<P, T> respTransfer, R... requests) {
String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
for (R request : requests) {
request.traceid = traceid;
request.computeWorkThreadIfAbsent();
}
return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, requests));
}
protected CompletableFuture<P> writeChannel(ClientConnection conn, R request) {
return conn.writeChannel(request);
}
protected <T> CompletableFuture<T> writeChannel(ClientConnection conn, R request, Function<P, T> respTransfer) {
return conn.writeChannel(request, respTransfer);
}
public final CompletableFuture<List<P>> sendAsync(R[] requests) {
return sendAsync(getAddress(requests[0]), requests, (Function) null);
}
public final <T> CompletableFuture<List<T>> sendAsync(R[] requests, Function<P, T> respTransfer) {
return sendAsync(getAddress(requests[0]), requests, respTransfer);
}
public final CompletableFuture<List<P>> sendAsync(SocketAddress addr, R[] requests) {
return sendAsync(addr, requests, (Function) null);
}
public final <T> CompletableFuture<List<T>> sendAsync(
SocketAddress addr, R[] requests, Function<P, T> respTransfer) {
String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
for (R request : requests) {
request.traceid = traceid;
request.computeWorkThreadIfAbsent();
}
return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, requests, respTransfer));
}
protected CompletableFuture<List<P>> writeChannelBatch(ClientConnection conn, R... requests) {
protected CompletableFuture<List<P>> writeChannel(ClientConnection conn, R... requests) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
return conn.writeChannel(requests);
}
protected CompletableFuture<List<P>> writeChannel(ClientConnection conn, R[] requests) {
return conn.writeChannel(requests);
}
protected <T> CompletableFuture<List<T>> writeChannel(
ClientConnection conn, R[] requests, Function<P, T> respTransfer) {
return conn.writeChannel(requests, respTransfer);
ClientConnection conn, Function<P, T> respTransfer, R[] requests) {
return conn.writeChannel(respTransfer, requests);
}
// 根据请求获取地址

View File

@@ -125,12 +125,11 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
final WorkThread workThread = request.workThread == null ? readThread : request.workThread;
try {
if (!halfCompleted && !request.isCompleted()) {
connection.sendHalfWriteInReadThread(request, exc);
if (exc == null) {
connection.sendHalfWriteInReadThread(request, exc);
// request没有发送完respFuture需要再次接收
return;
} else {
connection.sendHalfWriteInReadThread(request, exc);
// 异常了需要清掉半包
}
}

View File

@@ -111,70 +111,24 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
protected abstract ClientCodec createCodec();
protected final CompletableFuture<P> writeChannel(R request) {
return writeChannel(request, null);
return writeChannel((Function) null, request);
}
protected final CompletableFuture<List<P>> writeChannel(R[] requests) {
return writeChannel(requests, null);
return writeChannel((Function) null, requests);
}
protected final <T> CompletableFuture<T> writeChannel(Function<P, T> respTransfer, R request) {
return writeChannel0(respTransfer, request).thenApply(v -> Utility.isEmpty(v) ? null : v.get(0));
}
// respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<T> writeChannel(R request, Function<P, T> respTransfer) {
request.respTransfer = respTransfer;
ClientFuture respFuture = createClientFuture(request);
if (client.debug) {
client.logger.log(
Level.FINEST,
Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + this + ", sendRequest: "
+ request + ", respFuture: " + respFuture);
}
respWaitingCounter.increment(); // 放在writeChannelInWriteThread计数会延迟导致不准确
writeLock.lock();
try {
offerRespFuture(respFuture);
if (pauseWriting.get()) {
pauseRequests.add(respFuture);
} else {
sendRequestInLocking(request, respFuture);
}
} finally {
writeLock.unlock();
}
if (client.debug) {
return respFuture.whenComplete((v, t) -> {
client.logger.log(
Level.FINEST,
Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + this + ", respResult: "
+ (t != null ? t : v));
});
}
return respFuture;
}
protected void sendRequestInLocking(R request, ClientFuture respFuture) {
// 发送请求数据包
writeArray.clear();
request.writeTo(this, writeArray);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { // 还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
}
if (writeArray.length() > 0) {
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
}
}
protected final <T> CompletableFuture<List<T>> writeChannel(Function<P, T> respTransfer, R... requests) {
return writeChannel0(respTransfer, requests);
}
// respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<List<T>> writeChannel(R[] requests, Function<P, T> respTransfer) {
protected final <T> CompletableFuture<List<T>> writeChannel0(Function<P, T> respTransfer, R... requests) {
if (client.debug) {
client.logger.log(
Level.FINEST,
@@ -209,7 +163,11 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
return Utility.allOfFutures(respFutures);
}
protected void sendRequestInLocking(ClientFuture[] respFutures) {
protected void sendRequestInLocking(ClientFuture... respFutures) {
sendRequestToChannel(respFutures);
}
protected final void sendRequestToChannel(ClientFuture... respFutures) {
// 发送请求数据包
writeArray.clear();
for (ClientFuture respFuture : respFutures) {
@@ -280,13 +238,14 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
this.currHalfWriteFuture = null;
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
sendRequestInLocking(request, respFuture);
// sendRequestInLocking(request, respFuture);
sendRequestToChannel(respFuture);
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);
}
}
while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {
sendRequestInLocking((R) respFuture.getRequest(), respFuture);
sendRequestToChannel(respFuture);
}
} finally {
writeLock.unlock();
@@ -357,7 +316,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
}
// 只会在WriteIOThread中调用, 必须在writeLock内执行
void offerFirstRespFuture(ClientFuture<R, P> respFuture) {
protected void offerFirstRespFuture(ClientFuture<R, P> respFuture) {
Serializable requestid = respFuture.request.getRequestid();
if (requestid == null) {
respFutureQueue.offerFirst(respFuture);
@@ -367,7 +326,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
}
// 必须在writeLock内执行
void offerRespFuture(ClientFuture<R, P> respFuture) {
protected void offerRespFuture(ClientFuture<R, P> respFuture) {
Serializable requestid = respFuture.request.getRequestid();
if (requestid == null) {
respFutureQueue.offer(respFuture);
@@ -377,7 +336,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
}
// 只会被Timeout在ReadIOThread中调用
void removeRespFuture(Serializable requestid, ClientFuture<R, P> respFuture) {
protected void removeRespFuture(Serializable requestid, ClientFuture<R, P> respFuture) {
if (requestid == null) {
respFutureQueue.remove(respFuture);
} else {

View File

@@ -865,6 +865,13 @@ public final class Utility {
}
public static <T> CompletableFuture<List<T>> allOfFutures(CompletableFuture<T>[] futures) {
if (futures.length == 1) {
return futures[0].thenApply(v -> {
List<T> rs = new ArrayList<>(1);
rs.add(v);
return rs;
});
}
return CompletableFuture.allOf(futures).thenApply(v -> {
int size = futures.length;
List<T> rs = new ArrayList<>(size);
@@ -907,6 +914,14 @@ public final class Utility {
public static <T> CompletableFuture<List<T>> allOfFutures(
CompletableFuture<T>[] futures, BiConsumer<Integer, T> consumer) {
if (futures.length == 1) {
return futures[0].thenApply(v -> {
List<T> rs = new ArrayList<>(1);
consumer.accept(0, v);
rs.add(v);
return rs;
});
}
return CompletableFuture.allOf(futures).thenApply(v -> {
int size = futures.length;
List<T> rs = new ArrayList<>(size);