diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 3588ab505..92ce29f99 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -176,7 +176,7 @@ public abstract class Client, R extends ClientR protected abstract C createClientConnection(final int index, AsyncConnection channel); - //创建连接后先从服务器拉取数据构建的虚拟请求,返回null表示连上服务器后不读取数据 + //创建连接后会立马从服务器拉取数据构建的虚拟请求,返回null表示连上服务器后不会立马读取数据 protected R createVirtualRequestAfterConnect() { return null; } @@ -298,14 +298,6 @@ public abstract class Client, R extends ClientR return conn.writeChannel(requests, respTransfer); } - private C createConnection(int index, AsyncConnection channel) { - C conn = createClientConnection(index, channel); - if (!channel.isReadPending()) { - channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 - } - return conn; - } - public final CompletableFuture connect() { final int size = this.connArray.length; WorkThread workThread = WorkThread.currentWorkThread(); @@ -317,10 +309,15 @@ public abstract class Client, R extends ClientR final Queue> waitQueue = this.connAcquireWaitings[connIndex]; if (this.connOpenStates[connIndex].compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> (C) createConnection(connIndex, c).setMaxPipelines(maxPipelines)); + .thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); + } else { + future = future.thenApply(conn -> { + conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 + return conn; + }); } if (authenticate != null) { future = future.thenCompose(authenticate); @@ -362,10 +359,15 @@ public abstract class Client, R extends ClientR final Queue> waitQueue = entry.connAcquireWaitings; if (entry.connOpenState.compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> (C) createConnection(-1, c).setMaxPipelines(maxPipelines)); + .thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); + } else { + future = future.thenApply(conn -> { + conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 + return conn; + }); } if (authenticate != null) { future = future.thenCompose(authenticate); diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 6cdfaf266..15dbd06ff 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -74,6 +74,10 @@ public abstract class ClientCodec implements Complet } else { ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); if (respFuture != null) { + if (respFuture.request != cr.request) { + connection.dispose(new RedkaleException("request pipeline error")); + return; + } responseComplete(false, respFuture, cr.message, cr.exc); } respPool.accept(cr); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 5a3177f3b..a5755e5ac 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -258,6 +258,7 @@ public abstract class ClientConnection implements Co } finally { writeLock.unlock(); } + channel.readRegister(getCodec()); //不能在创建连接时注册读事件 return respFuture; }