优化client

This commit is contained in:
redkale
2023-10-21 08:04:53 +08:00
parent da81509157
commit cb3cb00e2b
2 changed files with 24 additions and 7 deletions

View File

@@ -13,6 +13,7 @@ import java.util.function.*;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.*; import org.redkale.util.*;
import static org.redkale.util.Utility.isNotEmpty;
/** /**
* *
@@ -327,11 +328,26 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex]; final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex];
if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) { if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); .thenApply(c -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
return (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines);
});
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) { if (virtualReq != null) {
virtualReq.traceid = traceid; virtualReq.traceid = traceid;
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); future = future.thenCompose(conn -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
return conn.writeVirtualRequest(virtualReq).thenApply(v -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
return conn;
});
});
} else { } else {
future = future.thenApply(conn -> { future = future.thenApply(conn -> {
conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread因executeRead可能会异步 conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread因executeRead可能会异步
@@ -342,7 +358,9 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
future = future.thenCompose(authenticate); future = future.thenCompose(authenticate);
} }
return future.thenApply(c -> { return future.thenApply(c -> {
Traces.computeIfAbsent(traceid); if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
c.setAuthenticated(true); c.setAuthenticated(true);
if (pool) { if (pool) {
this.connArray[connIndex] = c; this.connArray[connIndex] = c;
@@ -352,7 +370,9 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (workThread != null) { if (workThread != null) {
CompletableFuture<C> fs = f; CompletableFuture<C> fs = f;
workThread.execute(() -> { workThread.execute(() -> {
Traces.computeIfAbsent(traceid); if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
fs.complete(c); fs.complete(c);
}); });
} else { } else {

View File

@@ -14,7 +14,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*; import java.util.function.*;
import java.util.logging.Level;
import org.redkale.annotation.*; import org.redkale.annotation.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -134,7 +133,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} else { } else {
sendRequestInLocking(request, respFuture); sendRequestInLocking(request, respFuture);
} }
client.logger.log(Level.INFO, channel + ", " + request.getTraceid()+ " 发送完请求: " + request);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@@ -321,7 +319,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
public void dispose(Throwable exc) { public void dispose(Throwable exc) {
channel.offerWriteBuffer(writeBuffer); channel.offerWriteBuffer(writeBuffer);
channel.dispose(); channel.dispose();
System.out.println(Thread.currentThread().getName() + ": " + channel + ", 被关闭了");
Throwable e = exc == null ? new ClosedChannelException() : exc; Throwable e = exc == null ? new ClosedChannelException() : exc;
CompletableFuture f; CompletableFuture f;
respWaitingCounter.reset(); respWaitingCounter.reset();