优化client

This commit is contained in:
redkale
2023-02-04 20:19:35 +08:00
parent b60e5fef96
commit eef43e8edc
5 changed files with 45 additions and 76 deletions

View File

@@ -102,11 +102,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
try {
if (request != null && !request.isCompleted()) {
if (exc == null) {
connection.sendHalfWrite(exc);
connection.sendHalfWrite(request, exc);
//request没有发送完respFuture需要再次接收
return;
} else { //异常了需要清掉半包
connection.sendHalfWrite(exc);
connection.sendHalfWrite(request, exc);
}
}
if (request != null) {

View File

@@ -10,7 +10,6 @@ import java.nio.channels.ClosedChannelException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import java.util.function.*;
import org.redkale.net.*;
@@ -34,15 +33,9 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final LongAdder respWaitingCounter;
protected final AtomicBoolean pauseWriting = new AtomicBoolean();
final AtomicBoolean pauseWriting = new AtomicBoolean();
protected final AtomicBoolean pauseResuming = new AtomicBoolean();
protected final List<ClientFuture> pauseRequests = new CopyOnWriteArrayList<>();
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition pauseCondition = pauseLock.newCondition();
final ConcurrentLinkedQueue<ClientFuture> pauseRequests = new ConcurrentLinkedQueue<>();
protected final AsyncConnection channel;
@@ -140,8 +133,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
}
void sendHalfWrite(Throwable halfRequestExc) {
writeThread.sendHalfWrite(this, halfRequestExc);
void sendHalfWrite(R request, Throwable halfRequestExc) {
writeThread.sendHalfWrite(this, request, halfRequestExc);
}
//只会在WriteIOThread中调用
@@ -186,25 +179,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
}
void signalPauseRequest() {
pauseLock.lock();
try {
pauseCondition.signalAll();
} finally {
pauseLock.unlock();
}
}
void awaitPauseRequest() {
pauseLock.lock();
try {
pauseCondition.await(3_000, TimeUnit.SECONDS);
} catch (Exception e) {
} finally {
pauseLock.unlock();
}
}
public boolean isAuthenticated() {
return authenticated;
}

View File

@@ -26,6 +26,8 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
private ScheduledFuture timeout;
Boolean resumeHalfRequestFlag;
ClientFuture(ClientConnection conn, R request) {
super();
Objects.requireNonNull(conn);

View File

@@ -8,7 +8,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redkale.net.AsyncIOThread;
import org.redkale.util.*;
@@ -35,25 +34,13 @@ public class ClientWriteIOThread extends AsyncIOThread {
requestQueue.offer(respFuture);
}
public void sendHalfWrite(ClientConnection conn, Throwable halfRequestExc) {
if (conn.pauseWriting.get()) {
conn.pauseResuming.set(true);
try {
AtomicBoolean skipFirst = new AtomicBoolean(halfRequestExc != null);
conn.pauseRequests.removeIf(e -> {
if (e != null) {
if (!skipFirst.compareAndSet(true, false)) {
requestQueue.offer((ClientFuture) e);
}
}
return true;
});
} finally {
conn.pauseResuming.set(false);
conn.pauseWriting.set(false);
conn.signalPauseRequest();
}
public void sendHalfWrite(ClientConnection conn, ClientRequest request, Throwable halfRequestExc) {
ClientFuture respFuture = conn.createClientFuture(request);
respFuture.resumeHalfRequestFlag = true;
if (halfRequestExc != null) { //halfRequestExc不为null时需要把当前halfRequest移除
conn.pauseRequests.poll();
}
requestQueue.offer(respFuture);
}
@Override
@@ -71,24 +58,38 @@ public class ClientWriteIOThread extends AsyncIOThread {
try {
while ((entry = requestQueue.take()) != null) {
map.clear();
if (!entry.isDone()) {
if (entry.resumeHalfRequestFlag != null) { //将暂停的pauseRequests写入list
List<ClientFuture> cl = map.computeIfAbsent(entry.conn, c -> listPool.get());
for (ClientFuture f : (List<ClientFuture>) entry.conn.pauseRequests) {
if (!f.isDone()) {
entry.conn.offerRespFuture(f);
cl.add(f);
}
}
entry.conn.pauseRequests.clear();
entry.conn.pauseWriting.set(false);
} else if (!entry.isDone()) {
entry.conn.offerRespFuture(entry);
if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) {
entry.conn.awaitPauseRequest();
}
entry.conn.pauseRequests.add(entry);
} else {
map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry);
}
}
while ((entry = requestQueue.poll()) != null) {
if (!entry.isDone()) {
if (entry.resumeHalfRequestFlag != null) { //将暂停的pauseRequests写入list
List<ClientFuture> cl = map.computeIfAbsent(entry.conn, c -> listPool.get());
for (ClientFuture f : (List<ClientFuture>) entry.conn.pauseRequests) {
if (!f.isDone()) {
entry.conn.offerRespFuture(f);
cl.add(f);
}
}
entry.conn.pauseRequests.clear();
entry.conn.pauseWriting.set(false);
} else if (!entry.isDone()) {
entry.conn.offerRespFuture(entry);
if (entry.conn.pauseWriting.get()) {
if (entry.conn.pauseResuming.get()) {
entry.conn.awaitPauseRequest();
}
entry.conn.pauseRequests.add(entry);
} else {
map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry);