Client优化

This commit is contained in:
Redkale
2023-01-11 11:49:31 +08:00
parent bf3bf836ac
commit 35d923d856
4 changed files with 228 additions and 219 deletions

View File

@@ -40,6 +40,9 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
protected final ScheduledThreadPoolExecutor timeoutScheduler; protected final ScheduledThreadPoolExecutor timeoutScheduler;
//结合ClientRequest.isCompleted()使用
//使用场景批量request提交时后面的request需响应上一个request返回值来构建
//例如: MySQL批量提交PrepareSQL场景
protected final LongAdder reqWritedCounter = new LongAdder(); protected final LongAdder reqWritedCounter = new LongAdder();
protected final LongAdder respDoneCounter = new LongAdder(); protected final LongAdder respDoneCounter = new LongAdder();
@@ -174,16 +177,16 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
return; return;
} }
this.timeoutScheduler.shutdownNow(); this.timeoutScheduler.shutdownNow();
final R closereq = closeRequestSupplier == null ? null : closeRequestSupplier.get();
for (ClientConnection conn : this.connArray) { for (ClientConnection conn : this.connArray) {
if (conn == null) { if (conn == null) {
continue; continue;
} }
if (closereq == null) { final R closeReq = closeRequestSupplier == null ? null : closeRequestSupplier.get();
if (closeReq == null) {
conn.dispose(null); conn.dispose(null);
} else { } else {
try { try {
conn.writeChannel(closereq).get(1, TimeUnit.SECONDS); conn.writeChannel(closeReq).get(1, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
} }
conn.dispose(null); conn.dispose(null);
@@ -245,7 +248,7 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
CompletableFuture<ClientConnection> future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<ClientConnection> future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> createClientConnection(index, c).setMaxPipelines(maxPipelines)); .thenApply(c -> createClientConnection(index, c).setMaxPipelines(maxPipelines));
return (authenticate == null ? future : authenticate.apply(future)).thenApply(c -> { return (authenticate == null ? future : authenticate.apply(future)).thenApply(c -> {
c.authenticated = true; c.setAuthenticated(true);
this.connArray[index] = c; this.connArray[index] = c;
CompletableFuture<ClientConnection> f; CompletableFuture<ClientConnection> f;
if (cflag) { if (cflag) {
@@ -308,6 +311,10 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
return s; return s;
} }
protected void incrReqWritedCounter() {
reqWritedCounter.increment();
}
@Override @Override
public String resourceName() { public String resourceName() {
return name; return name;

View File

@@ -53,126 +53,19 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final Queue<SimpleEntry<R, ClientFuture<R>>> requestQueue = new ArrayDeque<>(); protected final Queue<SimpleEntry<R, ClientFuture<R>>> requestQueue = new ArrayDeque<>();
//responseQueue、responseMap二选一
final ArrayDeque<ClientFuture> responseQueue = new ArrayDeque<>(); final ArrayDeque<ClientFuture> responseQueue = new ArrayDeque<>();
//key: requestid //responseQueue、responseMap二选一, key: requestid
final HashMap<Serializable, ClientFuture> responseMap = new LinkedHashMap<>(); final HashMap<Serializable, ClientFuture> responseMap = new LinkedHashMap<>();
protected final CompletionHandler<Integer, Void> writeHandler = new CompletionHandler<Integer, Void>() { private int maxPipelines; //最大并行处理数
@Override private boolean closed;
public void completed(Integer result, Void attachment) {
if (writeLastRequest != null && writeLastRequest.isCloseType()) {
if (closeFuture != null) {
channel.getWriteIOThread().runWork(() -> {
closeFuture.complete(null);
});
}
closeFuture = null;
return;
}
if (sendWrite(false)) {
return;
}
writePending.compareAndSet(true, false);
readChannel();
}
@Override private boolean authenticated;
public void failed(Throwable exc, Void attachment) {
dispose(exc);
}
};
protected int maxPipelines; //最大并行处理数 private SimpleEntry<R, ClientFuture<R>> lastHalfEntry;
protected SimpleEntry<R, ClientFuture<R>> lastHalfEntry;
protected ClientConnection setMaxPipelines(int maxPipelines) {
this.maxPipelines = maxPipelines;
return this;
}
protected ClientConnection resetMaxPipelines() {
this.maxPipelines = client.maxPipelines;
return this;
}
protected boolean isWaitingResponseEmpty() {
return responseQueue.isEmpty() && responseMap.isEmpty();
}
protected void resumeWrite() {
this.pauseWriting.set(false);
}
//有写入数据返回true否则返回false
private boolean sendWrite(boolean must) {
ClientConnection conn = this;
ByteArray rw = conn.writeArray;
rw.clear();
int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue.size() - responseMap.size()) : 1;
if (must && pipelines < 1) {
pipelines = 1;
}
int c = 0;
AtomicBoolean pw = conn.pauseWriting;
for (int i = 0; i < pipelines; i++) {
if (pw.get()) {
break;
}
SimpleEntry<R, ClientFuture<R>> entry;
if (lastHalfEntry == null) {
entry = requestQueue.poll();
} else {
entry = lastHalfEntry;
lastHalfEntry = null;
}
if (entry == null) {
break;
}
R req = entry.getKey();
writeLastRequest = req;
if (req.getRequestid() == null && req.canMerge(conn)) {
SimpleEntry<R, ClientFuture<R>> r;
while ((r = requestQueue.poll()) != null) {
i++;
if (!req.merge(conn, r.getKey())) {
break;
}
ClientFuture<R> f = entry.getValue();
if (f != null) {
f.incrMergeCount();
}
//req.respFuture.mergeCount++;
}
req.accept(conn, rw);
if (r != null) {
r.getKey().accept(conn, rw);
req = r.getKey();
}
} else {
req.accept(conn, rw);
}
c++;
if (!req.isCompleted()) {
lastHalfEntry = entry;
this.pauseWriting.set(true);
break;
}
}
if (c > 0) { //当Client连接Server后先从Server读取数据时,会先发送一个EMPTY的request这样writeArray.count就会为0
channel.write(rw, writeHandler);
return true;
}
if (pw.get()) {
writePending.compareAndSet(true, false);
}
return false;
}
protected void preComplete(P resp, R req, Throwable exc) {
}
protected final CompletionHandler<Integer, ByteBuffer> readHandler = new CompletionHandler<Integer, ByteBuffer>() { protected final CompletionHandler<Integer, ByteBuffer> readHandler = new CompletionHandler<Integer, ByteBuffer>() {
@@ -192,7 +85,58 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
protected void completeResponse(ClientResponse<P> rs, ClientFuture respFuture) { private void decodeResponse(ByteBuffer buffer) {
if (codec.decodeMessages(buffer, readArray)) { //成功了
readArray.clear();
List<ClientResponse<P>> results = codec.pollMessages();
if (results != null) {
for (ClientResponse<P> rs : results) {
Serializable reqid = rs.getRequestid();
ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) {
int mergeCount = respFuture.getMergeCount();
completeResponse(rs, respFuture);
if (mergeCount > 0) {
for (int i = 0; i < mergeCount; i++) {
respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) {
completeResponse(rs, respFuture);
}
}
}
}
}
}
if (buffer.hasRemaining()) {
decodeResponse(buffer);
} else if (isWaitingResponseEmpty()) { //队列都已处理完了
buffer.clear();
channel.setReadBuffer(buffer);
if (readPending.compareAndSet(true, false)) {
//无消息处理
} else {
channel.read(this);
}
} else { //还有消息需要读取
if ((!requestQueue.isEmpty() || lastHalfEntry != null) && writePending.compareAndSet(false, true)) {
//先写后读取
if (sendWrite(true) <= 0) {
writePending.compareAndSet(true, false);
}
}
buffer.clear();
channel.setReadBuffer(buffer);
channel.read(this);
}
} else { //数据不全, 继续读
buffer.clear();
channel.setReadBuffer(buffer);
channel.read(this);
}
}
private void completeResponse(ClientResponse<P> rs, ClientFuture respFuture) {
if (respFuture != null) { if (respFuture != null) {
if (!respFuture.request.isCompleted()) { if (!respFuture.request.isCompleted()) {
if (rs.exc == null) { if (rs.exc == null) {
@@ -247,68 +191,36 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
public void decodeResponse(ByteBuffer buffer) {
if (codec.decodeMessages(buffer, readArray)) { //成功了
readArray.clear();
List<ClientResponse<P>> results = codec.pollMessages();
if (results != null) {
for (ClientResponse<P> rs : results) {
Serializable reqid = rs.getRequestid();
ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) {
int mergeCount = respFuture.getMergeCount();
completeResponse(rs, respFuture);
if (mergeCount > 0) {
for (int i = 0; i < mergeCount; i++) {
respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) {
completeResponse(rs, respFuture);
}
}
}
}
}
}
if (buffer.hasRemaining()) {
decodeResponse(buffer);
} else if (isWaitingResponseEmpty()) { //队列都已处理完了
buffer.clear();
channel.setReadBuffer(buffer);
if (readPending.compareAndSet(true, false)) {
//无消息处理
} else {
channel.read(this);
}
} else { //还有消息需要读取
if ((!requestQueue.isEmpty() || lastHalfEntry != null) && writePending.compareAndSet(false, true)) {
//先写后读取
if (!sendWrite(true)) {
writePending.compareAndSet(true, false);
}
}
buffer.clear();
channel.setReadBuffer(buffer);
channel.read(this);
}
} else { //数据不全, 继续读
buffer.clear();
channel.setReadBuffer(buffer);
channel.read(this);
}
}
@Override @Override
public void failed(Throwable t, ByteBuffer attachment) { public void failed(Throwable t, ByteBuffer attachment) {
dispose(t); dispose(t);
} }
}; };
protected boolean authenticated; protected final CompletionHandler<Integer, Void> writeHandler = new CompletionHandler<Integer, Void>() {
protected ClientFuture closeFuture; @Override
public void completed(Integer result, Void attachment) {
// if (writeLastRequest != null && writeLastRequest.isCloseType()) {
// if (closeFuture != null) {
// channel.getWriteIOThread().runWork(() -> {
// closeFuture.complete(null);
// });
// }
// closeFuture = null;
// return;
// }
if (sendWrite(false) <= 0) {
writePending.compareAndSet(true, false);
readChannel();
}
}
private R writeLastRequest; @Override
public void failed(Throwable exc, Void attachment) {
dispose(exc);
}
};
@SuppressWarnings({"LeakingThisInConstructor", "OverridableMethodCallInConstructor"}) @SuppressWarnings({"LeakingThisInConstructor", "OverridableMethodCallInConstructor"})
public ClientConnection(Client client, int index, AsyncConnection channel) { public ClientConnection(Client client, int index, AsyncConnection channel) {
@@ -322,51 +234,120 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected abstract ClientCodec createCodec(); protected abstract ClientCodec createCodec();
protected final CompletableFuture<P> writeChannel(R request) { protected final CompletableFuture<P> writeChannel(R request) {
ClientFuture respFuture; ClientFuture respFuture = createClientFuture(request);
if (request.isCloseType()) { int rts = this.channel.getReadTimeoutSeconds();
respFuture = createClientFuture(null); if (rts > 0 && !request.isCloseType()) {
closeFuture = respFuture; respFuture.setConn(this);
} else { respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
respFuture = createClientFuture(request);
int rts = this.channel.getReadTimeoutSeconds();
if (rts > 0 && respFuture.request != null) {
respFuture.setConn(this);
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
} }
respWaitingCounter.increment(); //放在writeChannelInThread计数会延迟,导致不准确 respWaitingCounter.increment(); //放在writeChannelUnsafe计数会延迟,导致不准确
if (channel.inCurrWriteThread()) { if (channel.inCurrWriteThread()) {
writeChannelInThread(request, respFuture); writeChannelUnsafe(request, respFuture);
} else { } else {
channel.executeWrite(() -> writeChannelInThread(request, respFuture)); channel.executeWrite(() -> writeChannelUnsafe(request, respFuture));
} }
return respFuture; return respFuture;
} }
private void writeChannelInThread(R request, ClientFuture<R> respFuture) { private void writeChannelUnsafe(R request, ClientFuture<R> respFuture) {
if (closed) {
WorkThread workThread = request.workThread;
if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = channel.getReadIOThread();
}
Throwable e = new ClosedChannelException();
workThread.runWork(() -> {
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(e);
});
return;
}
Serializable reqid = request.getRequestid(); Serializable reqid = request.getRequestid();
//保证顺序一致 //保证顺序一致
ClientFuture future; if (reqid == null) {
if (request.isCloseType()) { responseQueue.offer(respFuture);
future = null;
responseQueue.offer(ClientFuture.EMPTY);
} else { } else {
future = respFuture; responseMap.put(reqid, respFuture);
if (reqid == null) {
responseQueue.offer(respFuture);
} else {
responseMap.put(reqid, respFuture);
}
} }
requestQueue.offer(new SimpleEntry<>(request, future)); requestQueue.offer(new SimpleEntry<>(request, respFuture));
if (isAuthenticated() && client.reqWritedCounter != null) { if (isAuthenticated()) {
client.reqWritedCounter.increment(); client.incrReqWritedCounter();
} }
if (writePending.compareAndSet(false, true)) { if (writePending.compareAndSet(false, true)) {
sendWrite(true); sendWrite(true);
} }
} }
//返回写入数据request的数量返回0表示没有可写的request
private int sendWrite(boolean must) {
ClientConnection conn = this;
ByteArray rw = conn.writeArray;
rw.clear();
int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue.size() - responseMap.size()) : 1;
if (must && pipelines < 1) {
pipelines = 1;
}
int c = 0;
AtomicBoolean pw = conn.pauseWriting;
for (int i = 0; i < pipelines; i++) {
if (pw.get()) {
break;
}
SimpleEntry<R, ClientFuture<R>> entry;
if (lastHalfEntry == null) {
entry = requestQueue.poll();
} else {
entry = lastHalfEntry;
lastHalfEntry = null;
}
if (entry == null) {
break;
}
R req = entry.getKey();
if (req.getRequestid() == null && req.canMerge(conn)) {
SimpleEntry<R, ClientFuture<R>> r;
while ((r = requestQueue.poll()) != null) {
i++;
if (!req.merge(conn, r.getKey())) {
break;
}
ClientFuture<R> f = entry.getValue();
if (f != null) {
f.incrMergeCount();
}
}
req.accept(conn, rw);
if (r != null) {
req = r.getKey();
req.accept(conn, rw);
}
} else {
req.accept(conn, rw);
}
c++;
if (req.isCloseType()) {
closed = true;
this.pauseWriting.set(true);
break;
} else if (!req.isCompleted()) {
lastHalfEntry = entry;
this.pauseWriting.set(true);
break;
}
}
if (c > 0) { //当Client连接Server后先从Server读取数据时,会先发送一个EMPTY的request这样writeArray.count就会为0
channel.write(rw, writeHandler);
return c;
}
if (pw.get()) {
writePending.compareAndSet(true, false);
}
return 0;
}
protected void preComplete(P resp, R req, Throwable exc) {
}
protected ClientFuture createClientFuture(R request) { protected ClientFuture createClientFuture(R request) {
return new ClientFuture(request); return new ClientFuture(request);
} }
@@ -378,18 +359,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
public boolean isAuthenticated() {
return authenticated;
}
public AsyncConnection getChannel() {
return channel;
}
public ClientCodec<R, P> getCodec() {
return codec;
}
@Override //AsyncConnection.beforeCloseListener @Override //AsyncConnection.beforeCloseListener
public void accept(AsyncConnection t) { public void accept(AsyncConnection t) {
respWaitingCounter.reset(); respWaitingCounter.reset();
@@ -417,6 +386,45 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
public boolean isAuthenticated() {
return authenticated;
}
public AsyncConnection getChannel() {
return channel;
}
public ClientCodec<R, P> getCodec() {
return codec;
}
public int getMaxPipelines() {
return maxPipelines;
}
protected ClientConnection setAuthenticated(boolean authenticated) {
this.authenticated = authenticated;
return this;
}
protected ClientConnection setMaxPipelines(int maxPipelines) {
this.maxPipelines = maxPipelines;
return this;
}
protected ClientConnection resetMaxPipelines() {
this.maxPipelines = client.maxPipelines;
return this;
}
protected boolean isWaitingResponseEmpty() {
return responseQueue.isEmpty() && responseMap.isEmpty();
}
protected void resumeWrite() {
this.pauseWriting.set(false);
}
public int runningCount() { public int runningCount() {
return respWaitingCounter.intValue(); return respWaitingCounter.intValue();
} }

View File

@@ -120,12 +120,6 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
workThread = request.workThread; workThread = request.workThread;
request.workThread = null; request.workThread = null;
} }
// if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
// || workThread.getState() != Thread.State.RUNNABLE) {
// this.completeExceptionally(ex);
// } else {
// workThread.execute(() -> completeExceptionally(ex));
// }
if (workThread == null || workThread.getWorkExecutor() == null) { if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = conn.getChannel().getReadIOThread(); workThread = conn.getChannel().getReadIOThread();
} }

View File

@@ -59,7 +59,7 @@ public abstract class ClientRequest implements BiConsumer<ClientConnection, Byte
return false; return false;
} }
//数据是否全部写入如果只写部分返回false //数据是否全部写入如果只写部分返回false, 配合ClientConnection.pauseWriting使用
protected boolean isCompleted() { protected boolean isCompleted() {
return true; return true;
} }