Client优化

This commit is contained in:
Redkale
2023-01-12 10:57:47 +08:00
parent 35d923d856
commit 851efa5097
5 changed files with 189 additions and 30 deletions

View File

@@ -301,6 +301,10 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
write(array.content(), array.offset(), array.length(), null, 0, 0, null, null, handler);
}
public final <A> void write(ByteTuple array, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(array.content(), array.offset(), array.length(), null, 0, 0, null, null, attachment, handler);
}
public final void write(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) {
write(bytes, offset, length, null, 0, 0, null, null, handler);
}
@@ -310,6 +314,10 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, CompletionHandler<Integer, Void> handler) {
write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, null, handler);
}
public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, Object handlerAttachment, CompletionHandler handler) {
final ByteBuffer buffer = sslEngine == null ? pollWriteBuffer() : pollWriteSSLBuffer();
if (buffer.remaining() >= headerLength + bodyLength) {
buffer.put(headerContent, headerOffset, headerLength);
@@ -320,20 +328,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
buffer.flip();
CompletionHandler<Integer, Void> newHandler = new CompletionHandler<Integer, Void>() {
CompletionHandler<Integer, Object> newHandler = new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Void attachment) {
public void completed(Integer result, Object attachment) {
offerWriteBuffer(buffer);
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, Void attachment) {
public void failed(Throwable exc, Object attachment) {
offerWriteBuffer(buffer);
handler.failed(exc, attachment);
}
};
write(buffer, null, newHandler);
write(buffer, handlerAttachment, newHandler);
} else {
ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? writeBufferSupplier : () -> pollWriteSSLBuffer(), buffer);
writer.put(headerContent, headerOffset, headerLength);
@@ -344,20 +352,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
final ByteBuffer[] buffers = writer.toBuffers();
CompletionHandler<Integer, Void> newHandler = new CompletionHandler<Integer, Void>() {
CompletionHandler<Integer, Object> newHandler = new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Void attachment) {
public void completed(Integer result, Object attachment) {
offerWriteBuffer(buffers);
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, Void attachment) {
public void failed(Throwable exc, Object attachment) {
offerWriteBuffer(buffers);
handler.failed(exc, attachment);
}
};
write(buffers, null, newHandler);
write(buffers, handlerAttachment, newHandler);
}
}

View File

@@ -315,6 +315,10 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
reqWritedCounter.increment();
}
protected void incrRespDoneCounter() {
respDoneCounter.increment();
}
@Override
public String resourceName() {
return name;

View File

@@ -5,10 +5,14 @@
*/
package org.redkale.net.client;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.logging.Level;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.ByteArray;
import org.redkale.net.*;
import org.redkale.util.*;
/**
* 每个ClientConnection绑定一个独立的ClientCodec实例
@@ -21,26 +25,141 @@ import org.redkale.util.ByteArray;
* @param <R> ClientRequest
* @param <P> 响应对象
*/
public abstract class ClientCodec<R extends ClientRequest, P> {
public abstract class ClientCodec<R extends ClientRequest, P> implements CompletionHandler<Integer, ByteBuffer> {
protected final List<ClientResponse<P>> results = new ArrayList<>();
private final List<ClientResponse<P>> repsResults = new ArrayList<>();
protected final ClientConnection connection;
private final ClientConnection connection;
private final ByteArray readArray = new ByteArray();
public ClientCodec(ClientConnection connection) {
this.connection = connection;
}
//返回true: array会clear, 返回false: buffer会clear
public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array);
public abstract boolean decodeMessages(ClientConnection connection, ByteBuffer buffer, ByteArray array);
@Override
public final void completed(Integer count, ByteBuffer attachment) {
AsyncConnection channel = connection.channel;
if (count < 1) {
channel.setReadBuffer(attachment);
connection.dispose(new NonReadableChannelException());
return;
}
try {
attachment.flip();
decodeResponse(attachment);
} catch (Throwable e) {
channel.setReadBuffer(attachment);
connection.dispose(e);
}
}
private void decodeResponse(ByteBuffer buffer) {
AsyncConnection channel = connection.channel;
Deque<ClientFuture> responseQueue = connection.responseQueue;
Map<Serializable, ClientFuture> responseMap = connection.responseMap;
if (decodeMessages(connection, buffer, readArray)) { //成功了
readArray.clear();
List<ClientResponse<P>> results = pollMessages();
if (results != null) {
for (ClientResponse<P> rs : results) {
Serializable reqid = rs.getRequestid();
ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) {
int mergeCount = respFuture.getMergeCount();
completeResponse(rs, respFuture);
if (mergeCount > 0) {
for (int i = 0; i < mergeCount; i++) {
respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
if (respFuture != null) {
completeResponse(rs, respFuture);
}
}
}
}
}
}
if (buffer.hasRemaining()) {
decodeResponse(buffer);
} else { //队列都已处理完了
buffer.clear();
channel.setReadBuffer(buffer);
channel.read(this);
}
} else { //数据不全, 继续读
buffer.clear();
channel.setReadBuffer(buffer);
channel.read(this);
}
}
private void completeResponse(ClientResponse<P> rs, ClientFuture respFuture) {
if (respFuture != null) {
ClientRequest request = respFuture.request;
if (!request.isCompleted()) {
if (rs.exc == null) {
//request没有发送完respFuture需要再次接收
Serializable reqid = request.getRequestid();
if (reqid == null) {
connection.responseQueue.offerFirst(respFuture);
} else {
connection.responseMap.put(reqid, respFuture);
}
connection.pauseWriting.set(false);
connection.wakeupWrite();
return;
} else { //异常了需要清掉半包
connection.lastHalfEntry = null;
connection.pauseWriting.set(false);
connection.wakeupWrite();
}
}
connection.respWaitingCounter.decrement();
if (connection.isAuthenticated()) {
connection.client.incrRespDoneCounter();
}
try {
respFuture.cancelTimeout();
//if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message);
connection.preComplete(rs.message, (R) request, rs.exc);
WorkThread workThread = request.workThread;
request.workThread = null;
if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = connection.channel.getReadIOThread();
}
if (rs.exc != null) {
workThread.runWork(() -> {
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(rs.exc);
});
} else {
workThread.runWork(() -> {
Traces.currTraceid(request.traceid);
respFuture.complete(rs.message);
});
}
} catch (Throwable t) {
connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
}
}
}
@Override
public final void failed(Throwable t, ByteBuffer attachment) {
connection.dispose(t);
}
protected Iterator<ClientFuture> responseIterator() {
return connection.responseQueue.iterator();
}
public List<ClientResponse<P>> pollMessages() {
List<ClientResponse<P>> rs = new ArrayList<>(results);
this.results.clear();
List<ClientResponse<P>> rs = new ArrayList<>(repsResults);
this.repsResults.clear();
return rs;
}
@@ -49,15 +168,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> {
}
public void addMessage(P result) {
this.results.add(new ClientResponse<>(result));
this.repsResults.add(new ClientResponse<>(result));
}
public void addMessage(Throwable exc) {
this.results.add(new ClientResponse<>(exc));
}
public void reset() {
this.results.clear();
this.repsResults.add(new ClientResponse<>(exc));
}
@Override

View File

@@ -54,10 +54,12 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final Queue<SimpleEntry<R, ClientFuture<R>>> requestQueue = new ArrayDeque<>();
//responseQueue、responseMap二选一
final ArrayDeque<ClientFuture> responseQueue = new ArrayDeque<>();
final Deque<ClientFuture> responseQueue = new LinkedBlockingDeque<>();
//responseQueue、responseMap二选一, key: requestid
final HashMap<Serializable, ClientFuture> responseMap = new LinkedHashMap<>();
final Map<Serializable, ClientFuture> responseMap = new ConcurrentHashMap<>();
SimpleEntry<R, ClientFuture<R>> lastHalfEntry;
private int maxPipelines; //最大并行处理数
@@ -65,8 +67,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
private boolean authenticated;
private SimpleEntry<R, ClientFuture<R>> lastHalfEntry;
protected final CompletionHandler<Integer, ByteBuffer> readHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
@@ -86,7 +86,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
private void decodeResponse(ByteBuffer buffer) {
if (codec.decodeMessages(buffer, readArray)) { //成功了
if (codec.decodeMessages(ClientConnection.this, buffer, readArray)) { //成功了
readArray.clear();
List<ClientResponse<P>> results = codec.pollMessages();
if (results != null) {
@@ -279,7 +279,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
//返回写入数据request的数量返回0表示没有可写的request
private int sendWrite(boolean must) {
int sendWrite(boolean must) {
ClientConnection conn = this;
ByteArray rw = conn.writeArray;
rw.clear();
@@ -386,6 +386,13 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
}
public void wakeupWrite() {
AsyncIOThread thread = channel.getWriteIOThread();
if (thread instanceof ClientWriteIOThread) {
((ClientWriteIOThread) thread).wakeupWrite();
}
}
public boolean isAuthenticated() {
return authenticated;
}

View File

@@ -5,8 +5,9 @@ package org.redkale.net.client;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redkale.util.*;
/**
@@ -26,6 +27,12 @@ public class ClientWriteIOThread extends ClientIOThread {
requestQueue.offer(new ClientEntity(conn, request, respFuture));
}
public void wakeupWrite() {
synchronized (writeHandler) {
writeHandler.notify();
}
}
@Override
public void run() {
final ByteBuffer buffer = getBufferSupplier().get();
@@ -37,6 +44,7 @@ public class ClientWriteIOThread extends ClientIOThread {
ClientConnection conn = entity.conn;
ClientRequest request = entity.request;
ClientFuture respFuture = entity.respFuture;
AtomicBoolean pw = conn.pauseWriting;
Serializable reqid = request.getRequestid();
if (reqid == null) {
conn.responseQueue.offer(respFuture);
@@ -50,9 +58,14 @@ public class ClientWriteIOThread extends ClientIOThread {
buffer.clear();
buffer.put(rw.content(), 0, rw.length());
buffer.flip();
conn.channel.write(buffer, null, conn.writeHandler);
conn.channel.write(buffer, conn, writeHandler);
} else {
conn.channel.write(rw, conn.writeHandler);
conn.channel.write(rw, conn, writeHandler);
}
if (pw.get()) {
synchronized (writeHandler) {
writeHandler.wait(30_000);
}
}
}
} catch (InterruptedException e) {
@@ -60,6 +73,18 @@ public class ClientWriteIOThread extends ClientIOThread {
}
}
protected final CompletionHandler<Integer, ClientConnection> writeHandler = new CompletionHandler<Integer, ClientConnection>() {
@Override
public void completed(Integer result, ClientConnection attachment) {
}
@Override
public void failed(Throwable exc, ClientConnection attachment) {
attachment.dispose(exc);
}
};
protected static class ClientEntity {
ClientConnection conn;