client优化

This commit is contained in:
redkale
2023-03-31 14:22:00 +08:00
parent 6b92748b7c
commit 0b4a52ccb1
3 changed files with 35 additions and 97 deletions

View File

@@ -81,11 +81,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
protected int readTimeoutSeconds;
protected int writeTimeoutSeconds;
//------------------ LocalThreadMode模式 ------------------
final CopyOnWriteArrayList<C> localConnList = new CopyOnWriteArrayList<>();
final ThreadLocal<C> localConnection = new ThreadLocal();
//------------------ 可选项 ------------------
//PING心跳的请求数据为null且pingInterval<1表示不需要定时ping
@@ -203,10 +198,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
closeConnection(entry.connection);
}
this.connAddrEntrys.clear();
for (ClientConnection conn : this.localConnList) {
closeConnection(conn);
}
this.localConnList.clear();
group.close();
}
}
@@ -280,37 +271,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final CompletableFuture<C> connect() {
if (isThreadLocalConnMode()) {
C conn = localConnection.get();
if (conn == null || !conn.isOpen()) {
try {
conn = connect1();
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
localConnection.set(conn);
localConnList.add(conn);
}
return CompletableFuture.completedFuture(conn);
} else {
return connect0();
}
}
private C connect1() {
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createConnection(-2, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) {
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
}
if (authenticate != null) {
future = future.thenCompose(authenticate);
}
return future.thenApply(c -> (C) c.setAuthenticated(true)).join();
}
private CompletableFuture<C> connect0() {
final int size = this.connArray.length;
WorkThread workThread = WorkThread.currWorkThread();
final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
@@ -354,7 +314,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
//指定地址获取连接
public final CompletableFuture<C> connect(final SocketAddress addr) {
if (addr == null) {
return connect0();
return connect();
}
final AddressConnEntry<C> entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry());
C ec = entry.connection;

View File

@@ -104,11 +104,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
try {
if (!halfCompleted && !request.isCompleted()) {
if (exc == null) {
connection.sendHalfWrite(request, exc);
connection.sendHalfWriteInReadThread(request, exc);
//request没有发送完respFuture需要再次接收
return;
} else {
connection.sendHalfWrite(request, exc);
connection.sendHalfWriteInReadThread(request, exc);
//异常了需要清掉半包
}
}

View File

@@ -12,6 +12,7 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import org.redkale.annotation.Nullable;
import org.redkale.net.*;
@@ -31,7 +32,6 @@ import org.redkale.util.ByteArray;
*/
public abstract class ClientConnection<R extends ClientRequest, P> implements Consumer<AsyncConnection> {
//=-2 表示连接放在ThreadLocal存储
//=-1 表示连接放在connAddrEntrys存储
//>=0 表示connArray的下坐标从0开始
protected final int index;
@@ -45,6 +45,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final LongAdder doneResponseCounter = new LongAdder();
protected final ReentrantLock writeLock = new ReentrantLock();
protected final ByteArray writeArray = new ByteArray();
protected final ByteBuffer writeBuffer;
@@ -79,8 +81,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
this.client = client;
this.codec = createCodec();
this.index = index;
this.connEntry = index == -2 ? null : (index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()));
this.respWaitingCounter = index == -2 ? new LongAdder() : (index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting);
this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress());
this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting;
this.channel = channel.beforeCloseListener(this);
this.writeBuffer = channel.pollWriteBuffer();
}
@@ -100,41 +102,22 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟导致不准确
if (client.isThreadLocalConnMode()) {
writeLock.lock();
try {
offerRespFuture(respFuture);
writeArray.clear();
request.writeTo(this, writeArray);
doneRequestCounter.increment();
if (writeArray.length() > 0) {
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
}
}
} else {
if (channel.inCurrWriteThread()) {
writeChannelInThread(request, respFuture);
if (pauseWriting.get()) {
pauseRequests.add(respFuture);
} else {
channel.executeWrite(() -> writeChannelInThread(request, respFuture));
sendRequestInLocking(request, respFuture);
}
} finally {
writeLock.unlock();
}
return respFuture;
}
private void writeChannelInThread(R request, ClientFuture respFuture) {
offerRespFuture(respFuture);
if (pauseWriting.get()) {
pauseRequests.add(respFuture);
} else {
sendRequestInThread(request, respFuture);
}
}
private void sendRequestInThread(R request, ClientFuture respFuture) {
private void sendRequestInLocking(R request, ClientFuture respFuture) {
//发送请求数据包
writeArray.clear();
request.writeTo(this, writeArray);
@@ -157,28 +140,25 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
//发送半包和积压的请求数据包
private void sendHalfWriteInThread(R request, Throwable halfRequestExc) {
pauseWriting.set(false);
ClientFuture respFuture = this.currHalfWriteFuture;
if (respFuture != null) {
this.currHalfWriteFuture = null;
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
sendRequestInThread(request, respFuture);
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);
void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) {
writeLock.lock();
try {
pauseWriting.set(false);
ClientFuture respFuture = this.currHalfWriteFuture;
if (respFuture != null) {
this.currHalfWriteFuture = null;
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
sendRequestInLocking(request, respFuture);
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);
}
}
}
while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {
sendRequestInThread((R) respFuture.getRequest(), respFuture);
}
}
void sendHalfWrite(R request, Throwable halfRequestExc) {
if (channel.inCurrWriteThread()) {
sendHalfWriteInThread(request, halfRequestExc);
} else {
channel.executeWrite(() -> sendHalfWriteInThread(request, halfRequestExc));
while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {
sendRequestInLocking((R) respFuture.getRequest(), respFuture);
}
} finally {
writeLock.unlock();
}
}
@@ -206,8 +186,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
client.connArray[index] = null; //必须connOpenStates之后
} else if (connEntry != null) { //index=-1
connEntry.connOpenState.set(false);
} else {//index=-2
client.localConnList.remove(this);
}
}