优化ClientFuture

This commit is contained in:
Redkale
2023-01-04 11:53:17 +08:00
parent 7680b4e165
commit 5975be44d9
2 changed files with 42 additions and 10 deletions

View File

@@ -142,7 +142,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
ClientFuture<R> f = entry.getValue();
if (f != null) {
f.mergeCount++;
f.incrMergeCount();
}
//req.respFuture.mergeCount++;
}
@@ -214,9 +214,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
client.respDoneCounter.increment();
}
try {
if (respFuture.timeout != null) {
respFuture.timeout.cancel(true);
}
respFuture.cancelTimeout();
ClientRequest request = respFuture.request;
//if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message);
preComplete(rs.message, (R) request, rs.exc);
@@ -289,7 +287,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
Serializable reqid = rs.getRequestid();
ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) {
int mergeCount = respFuture.mergeCount;
int mergeCount = respFuture.getMergeCount();
completeResponse(rs, respFuture);
if (mergeCount > 0) {
for (int i = 0; i < mergeCount; i++) {
@@ -363,8 +361,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
respFuture = createClientFuture(request);
int rts = this.channel.getReadTimeoutSeconds();
if (rts > 0 && respFuture.request != null) {
respFuture.conn = this;
respFuture.timeout = client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS);
respFuture.setConn(this);
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
}
respWaitingCounter.increment(); //放在writeChannelInThread计数会延迟导致不准确

View File

@@ -26,21 +26,55 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
public boolean completeExceptionally(Throwable ex) {
return true;
}
@Override
void setConn(ClientConnection conn) {
}
@Override
void setTimeout(ScheduledFuture timeout) {
}
@Override
void incrMergeCount() {
}
@Override
public void run() {
}
};
protected final ClientRequest request;
ScheduledFuture timeout;
private ScheduledFuture timeout;
int mergeCount; //合并的个数,不算自身
private int mergeCount; //合并的个数,不算自身
ClientConnection conn;
private ClientConnection conn;
public ClientFuture(ClientRequest request) {
super();
this.request = request;
}
void setConn(ClientConnection conn) {
this.conn = conn;
}
void setTimeout(ScheduledFuture timeout) {
this.timeout = timeout;
}
void cancelTimeout() {
if (timeout != null) {
timeout.cancel(true);
}
}
void incrMergeCount() {
mergeCount++;
}
public int getMergeCount() {
return mergeCount;
}