优化client

This commit is contained in:
redkale
2023-04-07 14:05:13 +08:00
parent 7286531bc5
commit e6a2167261
6 changed files with 12 additions and 10 deletions

View File

@@ -50,7 +50,7 @@ public class MessageRespFutureNode implements Runnable {
@Override //超时后被timeoutExecutor调用 @Override //超时后被timeoutExecutor调用
public void run() { //timeout public void run() { //timeout
respNodes.remove(this.seqid); respNodes.remove(this.seqid);
future.completeExceptionally(new TimeoutException()); future.completeExceptionally(new TimeoutException("message-record: "+message));
logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms" logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms"
+ (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) ? (message.userid != null ? (", userid:" + message.userid) : (", groupid:" + message.groupid)) : "")); + (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) ? (message.userid != null ? (", userid:" + message.userid) : (", groupid:" + message.groupid)) : ""));
} }

View File

@@ -397,9 +397,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (writeCount == 0) { if (writeCount == 0) {
if (hasRemain) { if (hasRemain) {
//writeCompleted = false; writeCompleted = false;
//writeTotal = totalCount; writeTotal = totalCount;
continue; //要全部输出完才返回 //continue; //要全部输出完才返回
} }
break; break;
} else if (writeCount < 0) { } else if (writeCount < 0) {

View File

@@ -9,7 +9,7 @@ import java.io.Serializable;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.Iterator;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@@ -45,6 +45,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final LongAdder doneResponseCounter = new LongAdder(); protected final LongAdder doneResponseCounter = new LongAdder();
protected final AtomicBoolean writePending = new AtomicBoolean();
protected final ReentrantLock writeLock = new ReentrantLock(); protected final ReentrantLock writeLock = new ReentrantLock();
protected final ByteArray writeArray = new ByteArray(); protected final ByteArray writeArray = new ByteArray();
@@ -77,10 +79,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
private final ClientCodec<R, P> codec; private final ClientCodec<R, P> codec;
//respFutureQueue、respFutureMap二选一 SPSC队列模式 //respFutureQueue、respFutureMap二选一 SPSC队列模式
private final Deque<ClientFuture<R, P>> respFutureQueue = new ConcurrentLinkedDeque<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedDeque<ClientFuture<R, P>> respFutureQueue = new ConcurrentLinkedDeque<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>();
//respFutureQueue、respFutureMap二选一, key: requestid SPSC模式 //respFutureQueue、respFutureMap二选一, key: requestid SPSC模式
private final Map<Serializable, ClientFuture<R, P>> respFutureMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Serializable, ClientFuture<R, P>> respFutureMap = new ConcurrentHashMap<>();
Iterator<ClientFuture<R, P>> currRespIterator; //必须在调用decodeMessages之前重置为null Iterator<ClientFuture<R, P>> currRespIterator; //必须在调用decodeMessages之前重置为null

View File

@@ -73,7 +73,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
private void runTimeout() { private void runTimeout() {
conn.removeRespFuture(request.getRequestid(), this); conn.removeRespFuture(request.getRequestid(), this);
TimeoutException ex = new TimeoutException(); TimeoutException ex = new TimeoutException("client-request: " + request);
WorkThread workThread = null; WorkThread workThread = null;
if (request != null) { if (request != null) {
workThread = request.workThread; workThread = request.workThread;

View File

@@ -49,7 +49,7 @@ public class WebSocketFuture extends CompletableFuture<Integer> implements Runna
@Override @Override
public void run() { public void run() {
TimeoutException ex = new TimeoutException(); TimeoutException ex = new TimeoutException("packets: " + Arrays.toString(packets));
workThread.runWork(() -> completeExceptionally(ex)); workThread.runWork(() -> completeExceptionally(ex));
} }

View File

@@ -31,7 +31,7 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
protected final List<WebSocketFuture<Integer>> respList = new ArrayList(); protected final List<WebSocketFuture<Integer>> respList = new ArrayList();
protected final ConcurrentLinkedDeque<WebSocketFuture<Integer>> requestQueue = new ConcurrentLinkedDeque(); protected final ConcurrentLinkedQueue<WebSocketFuture<Integer>> requestQueue = new ConcurrentLinkedQueue();
public WebSocketWriteHandler(HttpContext context, WebSocket webSocket, ObjectPool<ByteArray> byteArrayPool) { public WebSocketWriteHandler(HttpContext context, WebSocket webSocket, ObjectPool<ByteArray> byteArrayPool) {
this.context = context; this.context = context;