优化ClientConnection.responseQueue命名

This commit is contained in:
Redkale
2023-01-03 10:56:32 +08:00
parent ba82ca3051
commit c72d0bec8c
3 changed files with 12 additions and 12 deletions

View File

@@ -35,7 +35,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> {
public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array); public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array);
protected Iterator<ClientFuture> responseIterator() { protected Iterator<ClientFuture> responseIterator() {
return connection.responseQueue2.iterator(); return connection.responseQueue.iterator();
} }
public List<ClientResponse<P>> pollMessages() { public List<ClientResponse<P>> pollMessages() {

View File

@@ -52,7 +52,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final Queue<R> requestQueue = new ArrayDeque<>(); protected final Queue<R> requestQueue = new ArrayDeque<>();
final ArrayDeque<ClientFuture> responseQueue2 = new ArrayDeque<>(); final ArrayDeque<ClientFuture> responseQueue = new ArrayDeque<>();
//key: requestid //key: requestid
final HashMap<Serializable, ClientFuture> responseMap = new LinkedHashMap<>(); final HashMap<Serializable, ClientFuture> responseMap = new LinkedHashMap<>();
@@ -96,7 +96,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
protected boolean isWaitingResponseEmpty() { protected boolean isWaitingResponseEmpty() {
return responseQueue2.isEmpty() && responseMap.isEmpty(); return responseQueue.isEmpty() && responseMap.isEmpty();
} }
protected void resumeWrite() { protected void resumeWrite() {
@@ -107,7 +107,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
ClientConnection conn = this; ClientConnection conn = this;
ByteArray rw = conn.writeArray; ByteArray rw = conn.writeArray;
rw.clear(); rw.clear();
int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue2.size() - responseMap.size()) : 1; int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue.size() - responseMap.size()) : 1;
if (must && pipelines < 1) { if (must && pipelines < 1) {
pipelines = 1; pipelines = 1;
} }
@@ -189,7 +189,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
if (rs.exc == null) { if (rs.exc == null) {
Serializable reqid = respFuture.request.getRequestid(); Serializable reqid = respFuture.request.getRequestid();
if (reqid == null) { if (reqid == null) {
responseQueue2.offerFirst(respFuture); responseQueue.offerFirst(respFuture);
} else { } else {
responseMap.put(reqid, respFuture); responseMap.put(reqid, respFuture);
} }
@@ -278,13 +278,13 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
if (results != null) { if (results != null) {
for (ClientResponse<P> rs : results) { for (ClientResponse<P> rs : results) {
Serializable reqid = rs.getRequestid(); Serializable reqid = rs.getRequestid();
ClientFuture respFuture = reqid == null ? responseQueue2.poll() : responseMap.remove(reqid); ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) { if (respFuture != null) {
int mergeCount = respFuture.mergeCount; int mergeCount = respFuture.mergeCount;
completeResponse(rs, respFuture); completeResponse(rs, respFuture);
if (mergeCount > 0) { if (mergeCount > 0) {
for (int i = 0; i < mergeCount; i++) { for (int i = 0; i < mergeCount; i++) {
respFuture = reqid == null ? responseQueue2.poll() : responseMap.remove(reqid); respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) { if (respFuture != null) {
completeResponse(rs, respFuture); completeResponse(rs, respFuture);
} }
@@ -371,11 +371,11 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
Serializable reqid = request.getRequestid(); Serializable reqid = request.getRequestid();
//保证顺序一致 //保证顺序一致
if (client.closeRequest != null && respFuture.request == client.closeRequest) { if (client.closeRequest != null && respFuture.request == client.closeRequest) {
responseQueue2.offer(ClientFuture.EMPTY); responseQueue.offer(ClientFuture.EMPTY);
} else { } else {
request.respFuture = respFuture; request.respFuture = respFuture;
if (reqid == null) { if (reqid == null) {
responseQueue2.offer(respFuture); responseQueue.offer(respFuture);
} else { } else {
responseMap.put(reqid, respFuture); responseMap.put(reqid, respFuture);
} }
@@ -425,8 +425,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
CompletableFuture f; CompletableFuture f;
respWaitingCounter.reset(); respWaitingCounter.reset();
WorkThread thread = channel.getAsyncIOThread(); WorkThread thread = channel.getAsyncIOThread();
if (!responseQueue2.isEmpty()) { if (!responseQueue.isEmpty()) {
while ((f = responseQueue2.poll()) != null) { while ((f = responseQueue.poll()) != null) {
CompletableFuture future = f; CompletableFuture future = f;
thread.runWork(() -> future.completeExceptionally(e)); thread.runWork(() -> future.completeExceptionally(e));
} }

View File

@@ -72,7 +72,7 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
} }
private void runTimeout() { private void runTimeout() {
Queue<ClientFuture> responseQueue = conn.responseQueue2; Queue<ClientFuture> responseQueue = conn.responseQueue;
if (responseQueue != null) { if (responseQueue != null) {
responseQueue.remove(this); responseQueue.remove(this);
} }