优化fastHandler

This commit is contained in:
redkale
2023-07-18 08:37:30 +08:00
parent 8ccea3750e
commit 8b3c472d70
3 changed files with 20 additions and 12 deletions

View File

@@ -224,7 +224,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
public abstract <A> void fastWrite(byte[] data, A attachment, CompletionHandler<Integer, ? super A> handler); public abstract <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler);
public abstract <A> void fastWrite(byte[] data);
protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler); protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler);
@@ -240,10 +242,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
read(handler); read(handler);
} }
public final <A> void fastWrite(byte[] data, CompletionHandler<Integer, ? super A> handler) {
fastWrite(data, null, handler);
}
public final void startReadInIOThread(CompletionHandler<Integer, ByteBuffer> handler) { public final void startReadInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrReadThread()) { if (inCurrReadThread()) {
startRead(handler); startRead(handler);

View File

@@ -85,6 +85,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey writeKey; protected SelectionKey writeKey;
protected CompletionHandler<Integer, Object> writeFastHandler;
public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext);
@@ -115,6 +117,13 @@ abstract class AsyncNioConnection extends AsyncConnection {
return this.writeTimeoutSeconds; return this.writeTimeoutSeconds;
} }
@Override
public <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler) {
Objects.requireNonNull(handler);
this.writeFastHandler = (CompletionHandler) handler;
return this;
}
@Override @Override
protected void startHandshake(final Consumer<Throwable> callback) { protected void startHandshake(final Consumer<Throwable> callback) {
ioReadThread.register(t -> super.startHandshake(callback)); ioReadThread.register(t -> super.startHandshake(callback));
@@ -291,18 +300,19 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
@Override @Override
public <A> void fastWrite(byte[] data, A attachment, CompletionHandler<Integer, ? super A> handler) { public <A> void fastWrite(byte[] data) {
CompletionHandler<Integer, ? super A> handler = this.writeFastHandler;
Objects.requireNonNull(data);
Objects.requireNonNull(handler, "fastHandler is null");
if (!this.isConnected()) { if (!this.isConnected()) {
handler.failed(new NotYetConnectedException(), null); handler.failed(new NotYetConnectedException(), null);
return; return;
} }
Objects.requireNonNull(data);
Objects.requireNonNull(handler);
this.writePending = true; this.writePending = true;
this.fastWriteQueue.offer(data); this.fastWriteQueue.offer(data);
this.fastWriteCount.incrementAndGet(); this.fastWriteCount.incrementAndGet();
this.writeCompletionHandler = (CompletionHandler) handler; this.writeCompletionHandler = (CompletionHandler) handler;
this.writeAttachment = attachment; this.writeAttachment = null;
try { try {
if (writeKey == null) { if (writeKey == null) {
ioWriteThread.register(selector -> { ioWriteThread.register(selector -> {

View File

@@ -102,7 +102,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
this.index = index; this.index = index;
this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()); this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress());
this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting; this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting;
this.channel = channel.beforeCloseListener(this); this.channel = channel.beforeCloseListener(this).fastHandler(writeHandler);
this.writeBuffer = channel.pollWriteBuffer(); this.writeBuffer = channel.pollWriteBuffer();
} }
@@ -180,7 +180,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
pauseWriting.set(true); pauseWriting.set(true);
currHalfWriteFuture = respFuture; currHalfWriteFuture = respFuture;
} }
channel.fastWrite(array.getBytes(), writeHandler); channel.fastWrite(array.getBytes());
} else { //旧方式 } else { //旧方式
//发送请求数据包 //发送请求数据包
writeArray.clear(); writeArray.clear();
@@ -221,7 +221,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
} }
channel.fastWrite(array.getBytes(), writeHandler); channel.fastWrite(array.getBytes());
} }
//发送半包和积压的请求数据包 //发送半包和积压的请求数据包