Client实现requestid

This commit is contained in:
Redkale
2023-01-01 13:53:19 +08:00
parent 391352e077
commit b3862cea72
5 changed files with 59 additions and 17 deletions

View File

@@ -34,8 +34,8 @@ public abstract class ClientCodec<R extends ClientRequest, P> {
//返回true: array会clear, 返回false: buffer会clear
public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array);
protected Queue<ClientFuture> responseQueue() {
return connection.responseQueue;
protected Iterator<ClientFuture> responseIterator() {
return connection.responseQueue2.iterator();
}
public List<ClientResponse<P>> pollMessages() {

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.net.client;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
@@ -51,7 +52,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final Queue<R> requestQueue = new ArrayDeque<>();
protected final ArrayDeque<ClientFuture> responseQueue = new ArrayDeque<>();
final ArrayDeque<ClientFuture> responseQueue2 = new ArrayDeque<>();
//key: requestid
final HashMap<Serializable, ClientFuture> responseMap = new LinkedHashMap<>();
protected final CompletionHandler<Integer, Void> writeHandler = new CompletionHandler<Integer, Void>() {
@@ -91,6 +95,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
return this;
}
protected boolean isWaitingResponseEmpty() {
return responseQueue2.isEmpty() && responseMap.isEmpty();
}
protected void resumeWrite() {
this.pauseWriting.set(false);
}
@@ -99,7 +107,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
ClientConnection conn = this;
ByteArray rw = conn.writeArray;
rw.clear();
int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue.size()) : 1;
int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue2.size() - responseMap.size()) : 1;
if (must && pipelines < 1) {
pipelines = 1;
}
@@ -120,7 +128,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
break;
}
writeLastRequest = req;
if (req.canMerge(conn)) {
if (req.getRequestid() == null && req.canMerge(conn)) {
R r;
while ((r = requestQueue.poll()) != null) {
i++;
@@ -179,7 +187,12 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
if (respFuture != null) {
if (!respFuture.request.isCompleted()) {
if (rs.exc == null) {
responseQueue.offerFirst(respFuture);
Serializable reqid = respFuture.request.getRequestid();
if (reqid == null) {
responseQueue2.offerFirst(respFuture);
} else {
responseMap.put(reqid, respFuture);
}
pauseWriting.set(false);
return;
} else { //异常了需要清掉半包
@@ -264,13 +277,14 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
List<ClientResponse<P>> results = codec.pollMessages();
if (results != null) {
for (ClientResponse<P> rs : results) {
ClientFuture respFuture = responseQueue.poll();
Serializable reqid = rs.getRequestid();
ClientFuture respFuture = reqid == null ? responseQueue2.poll() : responseMap.remove(reqid);
if (respFuture != null) {
int mergeCount = respFuture.mergeCount;
completeResponse(rs, respFuture);
if (mergeCount > 0) {
for (int i = 0; i < mergeCount; i++) {
respFuture = responseQueue.poll();
respFuture = reqid == null ? responseQueue2.poll() : responseMap.remove(reqid);
if (respFuture != null) {
completeResponse(rs, respFuture);
}
@@ -282,7 +296,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
if (buffer.hasRemaining()) {
decodeResponse(buffer);
} else if (responseQueue.isEmpty()) { //队列都已处理完了
} else if (isWaitingResponseEmpty()) { //队列都已处理完了
buffer.clear();
channel.setReadBuffer(buffer);
if (readPending.compareAndSet(true, false)) {
@@ -354,12 +368,17 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
private void writeChannelInThread(R request, ClientFuture respFuture) {
Serializable reqid = request.getRequestid();
//保证顺序一致
if (client.closeRequest != null && respFuture.request == client.closeRequest) {
responseQueue.offer(ClientFuture.EMPTY);
responseQueue2.offer(ClientFuture.EMPTY);
} else {
request.respFuture = respFuture;
responseQueue.offer(respFuture);
if (reqid == null) {
responseQueue2.offer(respFuture);
} else {
responseMap.put(reqid, respFuture);
}
}
requestQueue.offer(request);
if (isAuthenticated() && client.reqWritedCounter != null) {
@@ -406,9 +425,17 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
CompletableFuture f;
respWaitingCounter.reset();
WorkThread thread = channel.getAsyncIOThread();
while ((f = responseQueue.poll()) != null) {
CompletableFuture future = f;
thread.runWork(() -> future.completeExceptionally(e));
if (!responseQueue2.isEmpty()) {
while ((f = responseQueue2.poll()) != null) {
CompletableFuture future = f;
thread.runWork(() -> future.completeExceptionally(e));
}
}
if (!responseMap.isEmpty()) {
responseMap.forEach((key, future) -> {
responseMap.remove(key);
thread.runWork(() -> future.completeExceptionally(e));
});
}
}

View File

@@ -72,10 +72,14 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
}
private void runTimeout() {
Queue<ClientFuture> responseQueue = conn.responseQueue;
Queue<ClientFuture> responseQueue = conn.responseQueue2;
if (responseQueue != null) {
responseQueue.remove(this);
}
if (request.getRequestid() != null) {
conn.responseMap.remove(request.getRequestid());
}
TimeoutException ex = new TimeoutException();
WorkThread workThread = null;
if (request != null) {

View File

@@ -5,7 +5,8 @@
*/
package org.redkale.net.client;
import java.util.function.*;
import java.io.Serializable;
import java.util.function.BiConsumer;
import org.redkale.net.WorkThread;
import org.redkale.util.*;
@@ -28,6 +29,10 @@ public abstract class ClientRequest implements BiConsumer<ClientConnection, Byte
ClientFuture respFuture;
public Serializable getRequestid() {
return null;
}
public long getCreateTime() {
return createTime;
}
@@ -45,7 +50,7 @@ public abstract class ClientRequest implements BiConsumer<ClientConnection, Byte
return respFuture == null ? -1 : respFuture.mergeCount;
}
//是否能合并
//是否能合并 requestid=null的情况下值才有效
protected boolean canMerge(ClientConnection conn) {
return false;
}

View File

@@ -5,6 +5,8 @@
*/
package org.redkale.net.client;
import java.io.Serializable;
/**
*
* @author zhangjx
@@ -16,6 +18,10 @@ public class ClientResponse<P> {
protected Throwable exc;
public Serializable getRequestid() {
return null;
}
public ClientResponse(P result) {
this.message = result;
}