client优化

This commit is contained in:
redkale
2023-03-31 20:06:54 +08:00
parent 0b4a52ccb1
commit dba72cab46
2 changed files with 17 additions and 27 deletions

View File

@@ -10,7 +10,7 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
@@ -83,9 +83,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
private Consumer<AsyncConnection> beforeCloseListener;
//关联的事件数, 小于1表示没有事件
private final AtomicInteger eventing = new AtomicInteger();
//用于服务端的Socket, 等同于一直存在的readCompletionHandler
ProtocolCodec protocolCodec;
@@ -162,14 +159,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return sslEngine != null;
}
public final int increEventing() {
return eventing.incrementAndGet();
}
public final int decreEventing() {
return eventing.decrementAndGet();
}
public final void executeRead(Runnable command) {
ioReadThread.execute(command);
}

View File

@@ -14,7 +14,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import org.redkale.annotation.Nullable;
import org.redkale.annotation.*;
import org.redkale.net.*;
import org.redkale.util.ByteArray;
@@ -38,8 +38,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final Client client;
@Nullable
protected final LongAdder respWaitingCounter; //可能为null
@Nonnull
protected final LongAdder respWaitingCounter;
protected final LongAdder doneRequestCounter = new LongAdder();
@@ -51,6 +51,18 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final ByteBuffer writeBuffer;
protected final CompletionHandler<Integer, ClientConnection> writeHandler = new CompletionHandler<Integer, ClientConnection>() {
@Override
public void completed(Integer result, ClientConnection attachment) {
}
@Override
public void failed(Throwable exc, ClientConnection attachment) {
attachment.dispose(exc);
}
};
final AtomicBoolean pauseWriting = new AtomicBoolean();
final ConcurrentLinkedQueue<ClientFuture> pauseRequests = new ConcurrentLinkedQueue<>();
@@ -102,7 +114,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟导致不准确
writeLock.lock();
try {
offerRespFuture(respFuture);
@@ -336,15 +348,4 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
return s;
}
protected final CompletionHandler<Integer, ClientConnection> writeHandler = new CompletionHandler<Integer, ClientConnection>() {
@Override
public void completed(Integer result, ClientConnection attachment) {
}
@Override
public void failed(Throwable exc, ClientConnection attachment) {
attachment.dispose(exc);
}
};
}