ClientFuture优化

This commit is contained in:
Redkale
2023-01-18 11:19:15 +08:00
parent 961ff7fa22
commit 4d42f94be4
5 changed files with 96 additions and 77 deletions

View File

@@ -9,7 +9,6 @@ import java.io.Serializable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*; import org.redkale.net.*;
@@ -30,11 +29,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
protected final ClientConnection connection; protected final ClientConnection connection;
private final List<ClientResponse<P>> respResults = new ArrayList<>(); private final List<ClientResponse<R, P>> respResults = new ArrayList<>();
private final ByteArray readArray = new ByteArray(); private final ByteArray readArray = new ByteArray();
private final ObjectPool<ClientResponse> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); private final ObjectPool<ClientResponse<R, P>> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle);
public ClientCodec(ClientConnection connection) { public ClientCodec(ClientConnection connection) {
Objects.requireNonNull(connection); Objects.requireNonNull(connection);
@@ -63,13 +62,12 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
private void decodeResponse(ByteBuffer buffer) { private void decodeResponse(ByteBuffer buffer) {
AsyncConnection channel = connection.channel; AsyncConnection channel = connection.channel;
ConcurrentLinkedQueue<ClientFuture> responseQueue = connection.responseQueue; connection.currRespIterator = null;
Map<Serializable, ClientFuture> responseMap = connection.responseMap;
if (decodeMessages(buffer, readArray)) { //成功了 if (decodeMessages(buffer, readArray)) { //成功了
connection.currRespIterator = null;
readArray.clear(); readArray.clear();
for (ClientResponse<P> cr : respResults) { for (ClientResponse<R, P> cr : respResults) {
Serializable reqid = cr.getRequestid(); ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid());
ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) { if (respFuture != null) {
responseComplete(respFuture, cr.message, cr.exc); responseComplete(respFuture, cr.message, cr.exc);
} }
@@ -85,15 +83,16 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
channel.read(this); channel.read(this);
} }
} else { //数据不全, 继续读 } else { //数据不全, 继续读
connection.currRespIterator = null;
buffer.clear(); buffer.clear();
channel.setReadBuffer(buffer); channel.setReadBuffer(buffer);
channel.read(this); channel.read(this);
} }
} }
private void responseComplete(ClientFuture respFuture, P message, Throwable exc) { private void responseComplete(ClientFuture<R, P> respFuture, P message, Throwable exc) {
if (respFuture != null) { if (respFuture != null) {
ClientRequest request = respFuture.request; R request = respFuture.request;
WorkThread workThread = null; WorkThread workThread = null;
try { try {
if (!request.isCompleted()) { if (!request.isCompleted()) {
@@ -127,7 +126,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.currTraceid(request.traceid); Traces.currTraceid(request.traceid);
respFuture.complete(rs); ((ClientFuture) respFuture).complete(rs);
}); });
} }
} catch (Throwable t) { } catch (Throwable t) {
@@ -154,18 +153,8 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
connection.dispose(t); connection.dispose(t);
} }
protected Iterator<ClientFuture> responseIterator() { protected R findRequest(Serializable requestid) {
return connection.responseQueue.iterator(); return (R) connection.findRequest(requestid);
}
protected ClientFuture responseByRequestid(Serializable requestid) {
return (ClientFuture) connection.responseMap.get(requestid);
}
protected List<ClientResponse<P>> pollMessages() {
List<ClientResponse<P>> rs = new ArrayList<>(respResults);
this.respResults.clear();
return rs;
} }
public void addMessage(R request, P result) { public void addMessage(R request, P result) {

View File

@@ -7,7 +7,7 @@ package org.redkale.net.client;
import java.io.Serializable; import java.io.Serializable;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.List; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.function.*; import java.util.function.*;
@@ -45,11 +45,13 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
private final ClientWriteIOThread writeThread; private final ClientWriteIOThread writeThread;
//responseQueue、responseMap二选一 //respFutureQueue、respFutureMap二选一 SPSC队列模式
final ConcurrentLinkedQueue<ClientFuture> responseQueue = new ConcurrentLinkedQueue<>(); private final Queue<ClientFuture<R, P>> respFutureQueue = new ConcurrentLinkedQueue<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>();
//responseQueue、responseMap二选一, key: requestid //respFutureQueue、respFutureMap二选一, key: requestid SPSC模式
final ConcurrentHashMap<Serializable, ClientFuture> responseMap = new ConcurrentHashMap<>(); private final Map<Serializable, ClientFuture<R, P>> respFutureMap = new ConcurrentHashMap<>();
Iterator<ClientFuture<R, P>> currRespIterator; //必须在调用decodeMessages之前重置为null
private int maxPipelines; //最大并行处理数 private int maxPipelines; //最大并行处理数
@@ -84,12 +86,12 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
return respFuture; return respFuture;
} }
CompletableFuture writeVirtualRequest(R request) { CompletableFuture<P> writeVirtualRequest(R request) {
if (!request.isVirtualType()) { if (!request.isVirtualType()) {
return CompletableFuture.failedFuture(new RuntimeException("ClientVirtualRequest must be virtualType = true")); return CompletableFuture.failedFuture(new RuntimeException("ClientVirtualRequest must be virtualType = true"));
} }
ClientFuture respFuture = createClientFuture(request); ClientFuture<R, P> respFuture = createClientFuture(request);
responseQueue.offer(respFuture); respFutureQueue.offer(respFuture);
readChannel(); readChannel();
return respFuture; return respFuture;
} }
@@ -97,7 +99,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected void preComplete(P resp, R req, Throwable exc) { protected void preComplete(P resp, R req, Throwable exc) {
} }
protected ClientFuture createClientFuture(R request) { protected ClientFuture<R, P> createClientFuture(R request) {
return new ClientFuture(this, request); return new ClientFuture(this, request);
} }
@@ -119,15 +121,15 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
CompletableFuture f; CompletableFuture f;
respWaitingCounter.reset(); respWaitingCounter.reset();
WorkThread thread = channel.getReadIOThread(); WorkThread thread = channel.getReadIOThread();
if (!responseQueue.isEmpty()) { if (!respFutureQueue.isEmpty()) {
while ((f = responseQueue.poll()) != null) { while ((f = respFutureQueue.poll()) != null) {
CompletableFuture future = f; CompletableFuture future = f;
thread.runWork(() -> future.completeExceptionally(e)); thread.runWork(() -> future.completeExceptionally(e));
} }
} }
if (!responseMap.isEmpty()) { if (!respFutureMap.isEmpty()) {
responseMap.forEach((key, future) -> { respFutureMap.forEach((key, future) -> {
responseMap.remove(key); respFutureMap.remove(key);
thread.runWork(() -> future.completeExceptionally(e)); thread.runWork(() -> future.completeExceptionally(e));
}); });
} }
@@ -137,6 +139,48 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
writeThread.sendHalfWrite(this, halfRequestExc); writeThread.sendHalfWrite(this, halfRequestExc);
} }
//只会在WriteIOThread中调用
void offerRespFuture(ClientFuture<R, P> respFuture) {
Serializable requestid = respFuture.request.getRequestid();
if (requestid == null) {
respFutureQueue.offer(respFuture);
} else {
respFutureMap.put(requestid, respFuture);
}
}
//只会被Timeout在ReadIOThread中调用
void removeRespFuture(Serializable requestid, ClientFuture<R, P> respFuture) {
if (requestid == null) {
respFutureQueue.remove(respFuture);
} else {
respFutureMap.remove(requestid);
}
}
//只会被ClientCodec在ReadIOThread中调用
ClientFuture<R, P> pollRespFuture(Serializable requestid) {
if (requestid == null) {
return respFutureQueue.poll();
} else {
return respFutureMap.remove(requestid);
}
}
//只会被ClientCodec在ReadIOThread中调用
R findRequest(Serializable requestid) {
if (requestid == null) {
if (currRespIterator == null) {
currRespIterator = respFutureQueue.iterator();
}
ClientFuture<R, P> future = currRespIterator.hasNext() ? currRespIterator.next() : null;
return future == null ? null : future.request;
} else {
ClientFuture<R, P> future = respFutureMap.get(requestid);
return future == null ? null : future.request;
}
}
public boolean isAuthenticated() { public boolean isAuthenticated() {
return authenticated; return authenticated;
} }

View File

@@ -5,28 +5,31 @@
*/ */
package org.redkale.net.client; package org.redkale.net.client;
import java.util.*; import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.redkale.net.*; import org.redkale.net.*;
/** /**
* *
* @author zhangjx * @author zhangjx
* *
* @since 2.3.0 * @since 2.3.0
* *
* @param <R> 泛型
* @param <T> 泛型 * @param <T> 泛型
*/ */
public class ClientFuture<T> extends CompletableFuture<T> implements Runnable { public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<T> implements Runnable {
protected final ClientRequest request; protected final R request;
protected final ClientConnection conn; protected final ClientConnection conn;
private ScheduledFuture timeout; private ScheduledFuture timeout;
public ClientFuture(ClientConnection conn, ClientRequest request) { ClientFuture(ClientConnection conn, R request) {
super(); super();
Objects.requireNonNull(conn);
Objects.requireNonNull(request);
this.conn = conn; this.conn = conn;
this.request = request; this.request = request;
} }
@@ -42,14 +45,14 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
} }
@Override //JDK9+ @Override //JDK9+
public <U> ClientFuture<U> newIncompleteFuture() { public <U> ClientFuture<R, U> newIncompleteFuture() {
ClientFuture future = new ClientFuture<>(conn, request); ClientFuture future = new ClientFuture<>(conn, request);
future.timeout = timeout; future.timeout = timeout;
return future; return future;
} }
public <R extends ClientRequest> R getRequest() { public R getRequest() {
return (R) request; return request;
} }
@Override @Override
@@ -66,14 +69,7 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
} }
private void runTimeout() { private void runTimeout() {
Queue<ClientFuture> responseQueue = conn.responseQueue; conn.removeRespFuture(request.getRequestid(), this);
if (responseQueue != null) {
responseQueue.remove(this);
}
if (request.getRequestid() != null) {
conn.responseMap.remove(request.getRequestid());
}
TimeoutException ex = new TimeoutException(); TimeoutException ex = new TimeoutException();
WorkThread workThread = null; WorkThread workThread = null;
if (request != null) { if (request != null) {

View File

@@ -13,14 +13,15 @@ import java.io.Serializable;
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* @author zhangjx * @author zhangjx
* *
* @since 2.3.0 * @since 2.3.0
* *
* @param <R> 请求对象
* @param <P> message * @param <P> message
*/ */
public class ClientResponse<P> { public class ClientResponse<R extends ClientRequest, P> {
protected ClientRequest request; protected R request;
protected P message; protected P message;
@@ -29,12 +30,12 @@ public class ClientResponse<P> {
public ClientResponse() { public ClientResponse() {
} }
public ClientResponse(ClientRequest request, P message) { public ClientResponse(R request, P message) {
this.request = request; this.request = request;
this.message = message; this.message = message;
} }
public ClientResponse(ClientRequest request, Throwable exc) { public ClientResponse(R request, Throwable exc) {
this.request = request; this.request = request;
this.exc = exc; this.exc = exc;
} }
@@ -43,13 +44,13 @@ public class ClientResponse<P> {
return request == null ? null : request.getRequestid(); return request == null ? null : request.getRequestid();
} }
public ClientResponse<P> set(ClientRequest request, P message) { public ClientResponse<R, P> set(R request, P message) {
this.request = request; this.request = request;
this.message = message; this.message = message;
return this; return this;
} }
public ClientResponse<P> set(ClientRequest request, Throwable exc) { public ClientResponse<R, P> set(R request, Throwable exc) {
this.request = request; this.request = request;
this.exc = exc; this.exc = exc;
return this; return this;
@@ -68,11 +69,11 @@ public class ClientResponse<P> {
return true; return true;
} }
public ClientRequest getRequest() { public R getRequest() {
return request; return request;
} }
public void setRequest(ClientRequest request) { public void setRequest(R request) {
this.request = request; this.request = request;
} }

View File

@@ -3,7 +3,7 @@
*/ */
package org.redkale.net.client; package org.redkale.net.client;
import java.io.*; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
@@ -74,13 +74,7 @@ public class ClientWriteIOThread extends AsyncIOThread {
while ((entry = requestQueue.take()) != null) { while ((entry = requestQueue.take()) != null) {
map.clear(); map.clear();
if (!entry.isDone()) { if (!entry.isDone()) {
Serializable reqid = entry.request.getRequestid(); entry.conn.offerRespFuture(entry);
if (reqid == null) {
entry.conn.responseQueue.offer(entry);
} else {
entry.conn.responseMap.put(reqid, entry);
}
if (entry.conn.pauseWriting.get()) { if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) { if (entry.conn.pauseResuming.get()) {
try { try {
@@ -97,12 +91,7 @@ public class ClientWriteIOThread extends AsyncIOThread {
} }
while ((entry = requestQueue.poll()) != null) { while ((entry = requestQueue.poll()) != null) {
if (!entry.isDone()) { if (!entry.isDone()) {
Serializable reqid = entry.request.getRequestid(); entry.conn.offerRespFuture(entry);
if (reqid == null) {
entry.conn.responseQueue.offer(entry);
} else {
entry.conn.responseMap.put(reqid, entry);
}
if (entry.conn.pauseWriting.get()) { if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) { if (entry.conn.pauseResuming.get()) {
try { try {