AsyncConnection增加readRegister方法

This commit is contained in:
redkale
2023-03-26 21:15:00 +08:00
parent 71d3c015d9
commit ffbad698b4
4 changed files with 72 additions and 7 deletions

View File

@@ -235,6 +235,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler);
protected abstract void readImpl(CompletionHandler<Integer, ByteBuffer> handler);
//src写完才会回调
@@ -255,6 +257,22 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
public final void readRegister(CompletionHandler<Integer, ByteBuffer> handler) {
if (sslEngine == null) {
readRegisterImpl(handler);
} else {
sslReadRegisterImpl(false, handler);
}
}
public final void readRegisterInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrReadThread()) {
readRegister(handler);
} else {
executeRead(() -> readRegister(handler));
}
}
public final void read(CompletionHandler<Integer, ByteBuffer> handler) {
if (sslEngine == null) {
readImpl(handler);
@@ -908,7 +926,15 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
protected void sslReadImpl(final boolean handshake, final CompletionHandler<Integer, ByteBuffer> handler) {
readImpl(new CompletionHandler<Integer, ByteBuffer>() {
readImpl(createSslCompletionHandler(handshake, handler));
}
protected void sslReadRegisterImpl(final boolean handshake, final CompletionHandler<Integer, ByteBuffer> handler) {
readRegisterImpl(createSslCompletionHandler(handshake, handler));
}
private CompletionHandler<Integer, ByteBuffer> createSslCompletionHandler(final boolean handshake, final CompletionHandler<Integer, ByteBuffer> handler) {
return new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer count, ByteBuffer attachment) {
@@ -951,7 +977,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public void failed(Throwable t, ByteBuffer attachment) {
handler.failed(t, attachment);
}
});
};
}
//加密ssl内容数据

View File

@@ -127,6 +127,45 @@ abstract class AsyncNioConnection extends AsyncConnection {
read(handler);
}
@Override
protected final void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler) {
Objects.requireNonNull(handler);
if (!this.isConnected()) {
handler.failed(new NotYetConnectedException(), null);
return;
}
if (this.readPending) {
handler.failed(new ReadPendingException(), null);
return;
}
this.readPending = true;
if (this.readTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler;
newHandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer);
this.readCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.readCompletionHandler = handler;
}
try {
if (readKey == null) {
ioReadThread.register(selector -> {
try {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
} catch (ClosedChannelException e) {
handleRead(0, e);
}
});
} else {
ioReadThread.interestOpsOr(readKey, SelectionKey.OP_READ);
}
} catch (Exception e) {
handleRead(0, e);
}
}
@Override
public void readImpl(CompletionHandler<Integer, ByteBuffer> handler) {
Objects.requireNonNull(handler);

View File

@@ -278,7 +278,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (virtualReq != null) {
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
} else {
future = future.thenApply(conn -> (C) conn.readChannel());
future = future.thenApply(conn -> (C) conn.readRegisterChannel());
}
if (authenticate != null) {
future = future.thenCompose(authenticate);
@@ -323,7 +323,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (virtualReq != null) {
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
} else {
future = future.thenApply(conn -> (C) conn.readChannel());
future = future.thenApply(conn -> (C) conn.readRegisterChannel());
}
if (authenticate != null) {
future = future.thenCompose(authenticate);

View File

@@ -98,7 +98,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
ClientFuture<R, P> respFuture = createClientFuture(request);
respFutureQueue.offer(respFuture);
readChannel();
readRegisterChannel();
return respFuture;
}
@@ -109,8 +109,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
return new ClientFuture(this, request);
}
protected ClientConnection readChannel() {
channel.readInIOThread(codec);
protected ClientConnection readRegisterChannel() {
channel.readRegisterInIOThread(codec);
return this;
}