优化client

This commit is contained in:
redkale
2023-09-21 22:36:19 +08:00
parent 8b65320491
commit 19db4fdae9
3 changed files with 124 additions and 92 deletions

View File

@@ -224,9 +224,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
public abstract <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler); // public abstract <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler);
//
public abstract <A> void fastWrite(byte[] data); // public abstract <A> void fastWrite(byte[] data);
protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler); protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler);

View File

@@ -85,7 +85,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey writeKey; protected SelectionKey writeKey;
protected CompletionHandler<Integer, Object> writeFastHandler; // protected CompletionHandler<Integer, Object> writeFastHandler;
public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
@@ -117,12 +117,12 @@ abstract class AsyncNioConnection extends AsyncConnection {
return this.writeTimeoutSeconds; return this.writeTimeoutSeconds;
} }
@Override // @Override
public <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler) { // public <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler) {
Objects.requireNonNull(handler); // Objects.requireNonNull(handler);
this.writeFastHandler = (CompletionHandler) handler; // this.writeFastHandler = (CompletionHandler) handler;
return this; // return this;
} // }
@Override @Override
protected void startHandshake(final Consumer<Throwable> callback) { protected void startHandshake(final Consumer<Throwable> callback) {
@@ -299,44 +299,44 @@ abstract class AsyncNioConnection extends AsyncConnection {
doWrite(); doWrite();
} }
@Override // @Override
public <A> void fastWrite(byte[] data) { // public <A> void fastWrite(byte[] data) {
CompletionHandler<Integer, ? super A> handler = this.writeFastHandler; // CompletionHandler<Integer, ? super A> handler = this.writeFastHandler;
Objects.requireNonNull(data); // Objects.requireNonNull(data);
Objects.requireNonNull(handler, "fastHandler is null"); // Objects.requireNonNull(handler, "fastHandler is null");
if (!this.isConnected()) { // if (!this.isConnected()) {
handler.failed(new NotYetConnectedException(), null); // handler.failed(new NotYetConnectedException(), null);
return; // return;
} // }
this.writePending = true; // this.writePending = true;
this.fastWriteQueue.offer(data); // this.fastWriteQueue.offer(data);
this.fastWriteCount.incrementAndGet(); // this.fastWriteCount.incrementAndGet();
this.writeCompletionHandler = (CompletionHandler) handler; // this.writeCompletionHandler = (CompletionHandler) handler;
this.writeAttachment = null; // this.writeAttachment = null;
try { // try {
if (writeKey == null) { // if (writeKey == null) {
ioWriteThread.register(selector -> { // ioWriteThread.register(selector -> {
try { // try {
if (writeKey == null) { // if (writeKey == null) {
writeKey = keyFor(selector); // writeKey = keyFor(selector);
} // }
if (writeKey == null) { // if (writeKey == null) {
writeKey = implRegister(selector, SelectionKey.OP_WRITE); // writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this); // writeKey.attach(this);
} else { // } else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); // writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
} // }
} catch (ClosedChannelException e) { // } catch (ClosedChannelException e) {
handleWrite(0, e); // handleWrite(0, e);
} // }
}); // });
} else { // } else {
ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); // ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE);
} // }
} catch (Exception e) { // } catch (Exception e) {
handleWrite(0, e); // handleWrite(0, e);
} // }
} // }
public void doRead(boolean direct) { public void doRead(boolean direct) {
try { try {

View File

@@ -102,7 +102,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
this.index = index; this.index = index;
this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()); this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress());
this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting; this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting;
this.channel = channel.beforeCloseListener(this).fastHandler(writeHandler); this.channel = channel.beforeCloseListener(this); //.fastHandler(writeHandler);
this.writeBuffer = channel.pollWriteBuffer(); this.writeBuffer = channel.pollWriteBuffer();
} }
@@ -139,6 +139,28 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
return respFuture; return respFuture;
} }
private void sendRequestInLocking(R request, ClientFuture respFuture) {
//发送请求数据包
writeArray.clear();
request.writeTo(this, writeArray);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
}
if (writeArray.length() > 0) {
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
}
}
}
//respTransfer只会在ClientCodec的读线程里调用 //respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<List<T>> writeChannel(R[] requests, Function<P, T> respTransfer) { protected final <T> CompletableFuture<List<T>> writeChannel(R[] requests, Function<P, T> respTransfer) {
ClientFuture[] respFutures = new ClientFuture[requests.length]; ClientFuture[] respFutures = new ClientFuture[requests.length];
@@ -156,63 +178,32 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
writeLock.lock(); writeLock.lock();
try { try {
for (ClientFuture respFuture : respFutures) { if (pauseWriting.get()) {
offerRespFuture(respFuture); for (ClientFuture respFuture : respFutures) {
if (pauseWriting.get()) { offerRespFuture(respFuture);
pauseRequests.add(respFuture); pauseRequests.add(respFuture);
} }
} else {
for (ClientFuture respFuture : respFutures) {
offerRespFuture(respFuture);
}
sendRequestInLocking(respFutures);
} }
sendRequestInLocking(respFutures);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
return Utility.allOfFutures(respFutures); return Utility.allOfFutures(respFutures);
} }
private void sendRequestInLocking(R request, ClientFuture respFuture) {
if (false) { //新方式
ByteArray array = arrayThreadLocal.get();
array.clear();
request.writeTo(this, array);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
}
channel.fastWrite(array.getBytes());
} else { //旧方式
//发送请求数据包
writeArray.clear();
request.writeTo(this, writeArray);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
}
if (writeArray.length() > 0) {
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
}
}
}
}
private void sendRequestInLocking(ClientFuture[] respFutures) { private void sendRequestInLocking(ClientFuture[] respFutures) {
ByteArray array = arrayThreadLocal.get(); //发送请求数据包
array.clear(); writeArray.clear();
for (ClientFuture respFuture : respFutures) { for (ClientFuture respFuture : respFutures) {
if (pauseWriting.get()) { if (pauseWriting.get()) {
pauseRequests.add(respFuture); pauseRequests.add(respFuture);
} else { } else {
ClientRequest request = respFuture.request; ClientRequest request = respFuture.request;
request.writeTo(this, array); request.writeTo(this, writeArray);
if (request.isCompleted()) { if (request.isCompleted()) {
doneRequestCounter.increment(); doneRequestCounter.increment();
} else { //还剩半包没发送完 } else { //还剩半包没发送完
@@ -221,9 +212,50 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
} }
channel.fastWrite(array.getBytes()); if (writeArray.length() > 0) {
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
}
}
} }
// private void sendFastRequestInLocking(R request, ClientFuture respFuture) {
// ByteArray array = arrayThreadLocal.get();
// array.clear();
// request.writeTo(this, array);
// if (request.isCompleted()) {
// doneRequestCounter.increment();
// } else { //还剩半包没发送完
// pauseWriting.set(true);
// currHalfWriteFuture = respFuture;
// }
// channel.fastWrite(array.getBytes());
// }
//
// private void sendFastRequestInLocking(ClientFuture[] respFutures) {
// ByteArray array = arrayThreadLocal.get();
// array.clear();
// for (ClientFuture respFuture : respFutures) {
// if (pauseWriting.get()) {
// pauseRequests.add(respFuture);
// } else {
// ClientRequest request = respFuture.request;
// request.writeTo(this, array);
// if (request.isCompleted()) {
// doneRequestCounter.increment();
// } else { //还剩半包没发送完
// pauseWriting.set(true);
// currHalfWriteFuture = respFuture;
// }
// }
// }
// channel.fastWrite(array.getBytes());
// }
//发送半包和积压的请求数据包 //发送半包和积压的请求数据包
void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) { void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) {
writeLock.lock(); writeLock.lock();