ClientConnection优化

This commit is contained in:
redkale
2023-07-03 16:02:15 +08:00
parent 319b9e04dd
commit f016d5fb4a
6 changed files with 141 additions and 101 deletions

View File

@@ -224,6 +224,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
public abstract <A> void clientWrite(byte[] data, A attachment, CompletionHandler<Integer, ? super A> handler);
protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler);
protected abstract void readImpl(CompletionHandler<Integer, ByteBuffer> handler);
@@ -238,6 +240,10 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
read(handler);
}
public final <A> void clientWrite(byte[] data, CompletionHandler<Integer, ? super A> handler) {
clientWrite(data, null, handler);
}
public final void startReadInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrReadThread()) {
startRead(handler);

View File

@@ -60,9 +60,7 @@ public class AsyncIOThread extends WorkThread {
}
public void interestOpsOr(SelectionKey key, int opt) {
if (key == null) {
return;
}
Objects.requireNonNull(key);
if (key.selector() != selector) {
throw new RedkaleException("NioThread.selector not the same to SelectionKey.selector");
}

View File

@@ -24,7 +24,7 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
private final boolean readMode;
private CompletionHandler<Integer, A> handler;
CompletionHandler<Integer, A> handler;
private A attachment;

View File

@@ -9,11 +9,11 @@ import java.io.*;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import javax.net.ssl.SSLContext;
import org.redkale.util.ByteBufferWriter;
import org.redkale.util.*;
/**
*
@@ -80,6 +80,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey writeKey;
//-------------------------- 用于客户端的Socket --------------------------
//用于客户端的Socket
protected final Queue<byte[]> clientModeWriteQueue = new ConcurrentLinkedQueue<>();
public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext);
@@ -127,7 +131,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
handler.failed(new NotYetConnectedException(), null);
return;
}
if (handler != protocolCodec) {
if (handler != readCompletionHandler && handler != readTimeoutCompletionHandler.handler) {
if (this.readPending) {
handler.failed(new ReadPendingException(), null);
return;
@@ -142,19 +146,20 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.readCompletionHandler = handler;
}
} else {
this.readCompletionHandler = handler;
this.readPending = true;
}
try {
if (readKey == null) {
ioReadThread.register(selector -> {
if (readKey == null) {
try {
try {
if (readKey == null) {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
} catch (ClosedChannelException e) {
handleRead(0, e);
} else {
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
}
} catch (ClosedChannelException e) {
handleRead(0, e);
}
});
} else {
@@ -281,6 +286,38 @@ abstract class AsyncNioConnection extends AsyncConnection {
doWrite();
}
@Override
public <A> void clientWrite(byte[] data, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (!this.isConnected()) {
handler.failed(new NotYetConnectedException(), null);
return;
}
Objects.requireNonNull(data);
Objects.requireNonNull(handler);
this.writePending = true;
this.clientModeWriteQueue.offer(data);
this.writeCompletionHandler = (CompletionHandler) handler;
this.writeAttachment = attachment;
if (writeKey == null) {
ioWriteThread.register(selector -> {
try {
if (writeKey == null) {
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this);
} else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
}
} catch (ClosedChannelException e) {
this.writeCompletionHandler = (CompletionHandler) handler;
this.writeAttachment = attachment;
handleWrite(0, e);
}
});
} else {
ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE);
}
}
public void doRead(boolean direct) {
try {
this.readTime = System.currentTimeMillis();
@@ -300,8 +337,12 @@ abstract class AsyncNioConnection extends AsyncConnection {
} else if (readKey == null) {
ioReadThread.register(selector -> {
try {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
if (readKey == null) {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
} else {
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
}
} catch (ClosedChannelException e) {
handleRead(0, e);
}
@@ -321,6 +362,23 @@ abstract class AsyncNioConnection extends AsyncConnection {
boolean hasRemain = true;
boolean writeCompleted = true;
if (clientMode && writeByteTuple1Array == null && !clientModeWriteQueue.isEmpty()) {
byte[] bs = null;
byte[] item;
while ((item = clientModeWriteQueue.poll()) != null) {
bs = Utility.append(bs, item);
}
this.writePending = true;
this.writeByteTuple1Array = bs;
this.writeByteTuple1Offset = 0;
this.writeByteTuple1Length = bs == null ? 0 : bs.length;
this.writeByteTuple2Array = null;
this.writeByteTuple2Offset = 0;
this.writeByteTuple2Length = 0;
this.writeOffset = 0;
this.writeLength = this.writeByteTuple1Length;
}
int batchOffset = writeOffset;
int batchLength = writeLength;
while (hasRemain) { //必须要将buffer写完为止
@@ -386,11 +444,11 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (writeCount == 0) {
if (hasRemain) {
writeCompleted = false;
writeTotal = totalCount;
//continue; //要全部输出完才返回
//writeCompleted = false;
//writeTotal = totalCount;
continue; //要全部输出完才返回
}
break;
break;
} else if (writeCount < 0) {
if (totalCount == 0) {
totalCount = writeCount;
@@ -407,14 +465,27 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (writeCompleted && (totalCount != 0 || !hasRemain)) {
handleWrite(writeTotal + totalCount, null);
} else if (writeKey == null) {
ioWriteThread.register(selector -> {
if (inCurrWriteThread()) {
try {
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey = implRegister(ioWriteThread.selector, SelectionKey.OP_WRITE);
writeKey.attach(this);
} catch (ClosedChannelException e) {
handleWrite(0, e);
}
});
} else {
ioWriteThread.register(selector -> {
try {
if (writeKey == null) {
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this);
} else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
}
} catch (ClosedChannelException e) {
handleWrite(0, e);
}
});
}
} else {
ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE);
}

View File

@@ -281,16 +281,18 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (authenticate != null) {
future = future.thenCompose(authenticate);
}
return future.thenApply(c -> {
c.setAuthenticated(true);
this.connArray[connIndex] = c;
CompletableFuture<C> f;
while ((f = waitQueue.poll()) != null) {
if (!f.isDone()) {
f.complete(c);
return future.thenCompose(c -> {
return CompletableFuture.supplyAsync(() -> {
c.setAuthenticated(true);
this.connArray[connIndex] = c;
CompletableFuture<C> f;
while ((f = waitQueue.poll()) != null) {
if (!f.isDone()) {
f.complete(c);
}
}
}
return c;
return c;
}, c.channel.getWriteIOThread());
}).whenComplete((r, t) -> {
if (t != null) {
this.connOpenStates[connIndex].set(false);
@@ -324,16 +326,18 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (authenticate != null) {
future = future.thenCompose(authenticate);
}
return future.thenApply(c -> {
c.setAuthenticated(true);
entry.connection = c;
CompletableFuture<C> f;
while ((f = waitQueue.poll()) != null) {
if (!f.isDone()) {
f.complete(c);
return future.thenCompose(c -> {
return CompletableFuture.supplyAsync(() -> {
c.setAuthenticated(true);
entry.connection = c;
CompletableFuture<C> f;
while ((f = waitQueue.poll()) != null) {
if (!f.isDone()) {
f.complete(c);
}
}
}
return c;
return c;
}, c.channel.getWriteIOThread());
}).whenComplete((r, t) -> {
if (t != null) {
entry.connOpenState.set(false);

View File

@@ -51,47 +51,16 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final ByteArray writeArray = new ByteArray();
protected final ThreadLocal<ByteArray> arrayThreadLocal = ThreadLocal.withInitial(() -> new ByteArray());
protected final ByteBuffer writeBuffer;
protected final CompletionHandler<Integer, ClientConnection> writeHandler = new CompletionHandler<Integer, ClientConnection>() {
@Override
public void completed(Integer result, ClientConnection attachment) {
if (pauseWriting.get()) { //等待sendHalfWriteInReadThread调用
if (!writePending.compareAndSet(true, false)) {
completed(0, attachment);
}
return;
}
ByteArray array = writeArray;
array.clear();
ClientFuture<R, P> respFuture;
while ((respFuture = requestQueue.poll()) != null) {
if (!respFuture.isDone()) {
R request = respFuture.request;
request.writeTo(attachment, array);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
break;
}
}
}
if (array.length() > 0) {
if (writeBuffer.capacity() >= array.length()) {
writeBuffer.clear();
writeBuffer.put(array.content(), 0, array.length());
writeBuffer.flip();
channel.write(writeBuffer, attachment, this);
} else {
channel.write(array, attachment, this);
}
} else {
if (!writePending.compareAndSet(true, false)) {
completed(0, attachment);
}
if (attachment == null) { //新方式
channel.readRegister(getCodec());
}
}
@@ -116,8 +85,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
private final ClientCodec<R, P> codec;
private final ConcurrentLinkedQueue<ClientFuture<R, P>> requestQueue = new ConcurrentLinkedQueue();
//respFutureQueue、respFutureMap二选一 SPSC队列模式
private final ConcurrentLinkedDeque<ClientFuture<R, P>> respFutureQueue = new ConcurrentLinkedDeque<>();
@@ -156,7 +123,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟导致不准确
writeLock.lock();
try {
offerRespFuture(respFuture);
@@ -172,7 +138,18 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
private void sendRequestInLocking(R request, ClientFuture respFuture) {
if (writePending.compareAndSet(false, true)) {
if (true) { //新方式
ByteArray array = arrayThreadLocal.get();
array.clear();
request.writeTo(this, array);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
}
channel.clientWrite(array.getBytes(), writeHandler);
} else { //旧方式
//发送请求数据包
writeArray.clear();
request.writeTo(this, writeArray);
@@ -191,11 +168,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} else {
channel.write(writeArray, this, writeHandler);
}
} else {
writePending.compareAndSet(true, false);
}
} else {
requestQueue.offer(respFuture);
}
}
@@ -207,26 +180,14 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
ClientFuture respFuture = this.currHalfWriteFuture;
if (respFuture != null) {
this.currHalfWriteFuture = null;
if (!respFuture.isDone()) {
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
ClientFuture future;
while ((future = pauseRequests.poll()) != null) {
requestQueue.add(future);
}
sendRequestInLocking(request, respFuture);
return;
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);
}
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
sendRequestInLocking(request, respFuture);
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);
}
}
respFuture = pauseRequests.poll();
if (respFuture != null) {
ClientFuture future;
while ((future = pauseRequests.poll()) != null) {
requestQueue.add(future);
}
while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {
sendRequestInLocking((R) respFuture.getRequest(), respFuture);
}
} finally {