diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 3013f129e..0b79e1752 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -81,11 +81,6 @@ public abstract class Client, R extends ClientR protected int readTimeoutSeconds; protected int writeTimeoutSeconds; - //------------------ LocalThreadMode模式 ------------------ - - final CopyOnWriteArrayList localConnList = new CopyOnWriteArrayList<>(); - - final ThreadLocal localConnection = new ThreadLocal(); //------------------ 可选项 ------------------ //PING心跳的请求数据,为null且pingInterval<1表示不需要定时ping @@ -203,10 +198,6 @@ public abstract class Client, R extends ClientR closeConnection(entry.connection); } this.connAddrEntrys.clear(); - for (ClientConnection conn : this.localConnList) { - closeConnection(conn); - } - this.localConnList.clear(); group.close(); } } @@ -280,37 +271,6 @@ public abstract class Client, R extends ClientR } public final CompletableFuture connect() { - if (isThreadLocalConnMode()) { - C conn = localConnection.get(); - if (conn == null || !conn.isOpen()) { - try { - conn = connect1(); - } catch (Exception e) { - return CompletableFuture.failedFuture(e); - } - localConnection.set(conn); - localConnList.add(conn); - } - return CompletableFuture.completedFuture(conn); - } else { - return connect0(); - } - } - - private C connect1() { - CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> (C) createConnection(-2, c).setMaxPipelines(maxPipelines)); - R virtualReq = createVirtualRequestAfterConnect(); - if (virtualReq != null) { - future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); - } - if (authenticate != null) { - future = future.thenCompose(authenticate); - } - return future.thenApply(c -> (C) c.setAuthenticated(true)).join(); - } - - private CompletableFuture connect0() { final int size = this.connArray.length; WorkThread workThread = WorkThread.currWorkThread(); final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size; @@ -354,7 +314,7 @@ public abstract class Client, R extends ClientR //指定地址获取连接 public final CompletableFuture connect(final SocketAddress addr) { if (addr == null) { - return connect0(); + return connect(); } final AddressConnEntry entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry()); C ec = entry.connection; diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index e3abd833f..cc412f373 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -104,11 +104,11 @@ public abstract class ClientCodec implements Complet try { if (!halfCompleted && !request.isCompleted()) { if (exc == null) { - connection.sendHalfWrite(request, exc); + connection.sendHalfWriteInReadThread(request, exc); //request没有发送完,respFuture需要再次接收 return; } else { - connection.sendHalfWrite(request, exc); + connection.sendHalfWriteInReadThread(request, exc); //异常了需要清掉半包 } } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 7e8399283..2ac08f998 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -12,6 +12,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import org.redkale.annotation.Nullable; import org.redkale.net.*; @@ -31,7 +32,6 @@ import org.redkale.util.ByteArray; */ public abstract class ClientConnection implements Consumer { - //=-2 表示连接放在ThreadLocal存储 //=-1 表示连接放在connAddrEntrys存储 //>=0 表示connArray的下坐标,从0开始 protected final int index; @@ -45,6 +45,8 @@ public abstract class ClientConnection implements Co protected final LongAdder doneResponseCounter = new LongAdder(); + protected final ReentrantLock writeLock = new ReentrantLock(); + protected final ByteArray writeArray = new ByteArray(); protected final ByteBuffer writeBuffer; @@ -79,8 +81,8 @@ public abstract class ClientConnection implements Co this.client = client; this.codec = createCodec(); this.index = index; - this.connEntry = index == -2 ? null : (index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress())); - this.respWaitingCounter = index == -2 ? new LongAdder() : (index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting); + this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()); + this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting; this.channel = channel.beforeCloseListener(this); this.writeBuffer = channel.pollWriteBuffer(); } @@ -100,41 +102,22 @@ public abstract class ClientConnection implements Co respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); } respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟,导致不准确 - if (client.isThreadLocalConnMode()) { + + writeLock.lock(); + try { offerRespFuture(respFuture); - writeArray.clear(); - request.writeTo(this, writeArray); - doneRequestCounter.increment(); - if (writeArray.length() > 0) { - if (writeBuffer.capacity() >= writeArray.length()) { - writeBuffer.clear(); - writeBuffer.put(writeArray.content(), 0, writeArray.length()); - writeBuffer.flip(); - channel.write(writeBuffer, this, writeHandler); - } else { - channel.write(writeArray, this, writeHandler); - } - } - } else { - if (channel.inCurrWriteThread()) { - writeChannelInThread(request, respFuture); + if (pauseWriting.get()) { + pauseRequests.add(respFuture); } else { - channel.executeWrite(() -> writeChannelInThread(request, respFuture)); + sendRequestInLocking(request, respFuture); } + } finally { + writeLock.unlock(); } return respFuture; } - private void writeChannelInThread(R request, ClientFuture respFuture) { - offerRespFuture(respFuture); - if (pauseWriting.get()) { - pauseRequests.add(respFuture); - } else { - sendRequestInThread(request, respFuture); - } - } - - private void sendRequestInThread(R request, ClientFuture respFuture) { + private void sendRequestInLocking(R request, ClientFuture respFuture) { //发送请求数据包 writeArray.clear(); request.writeTo(this, writeArray); @@ -157,28 +140,25 @@ public abstract class ClientConnection implements Co } //发送半包和积压的请求数据包 - private void sendHalfWriteInThread(R request, Throwable halfRequestExc) { - pauseWriting.set(false); - ClientFuture respFuture = this.currHalfWriteFuture; - if (respFuture != null) { - this.currHalfWriteFuture = null; - if (halfRequestExc == null) { - offerFirstRespFuture(respFuture); - sendRequestInThread(request, respFuture); - } else { - codec.responseComplete(true, respFuture, null, halfRequestExc); + void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) { + writeLock.lock(); + try { + pauseWriting.set(false); + ClientFuture respFuture = this.currHalfWriteFuture; + if (respFuture != null) { + this.currHalfWriteFuture = null; + if (halfRequestExc == null) { + offerFirstRespFuture(respFuture); + sendRequestInLocking(request, respFuture); + } else { + codec.responseComplete(true, respFuture, null, halfRequestExc); + } } - } - while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) { - sendRequestInThread((R) respFuture.getRequest(), respFuture); - } - } - - void sendHalfWrite(R request, Throwable halfRequestExc) { - if (channel.inCurrWriteThread()) { - sendHalfWriteInThread(request, halfRequestExc); - } else { - channel.executeWrite(() -> sendHalfWriteInThread(request, halfRequestExc)); + while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) { + sendRequestInLocking((R) respFuture.getRequest(), respFuture); + } + } finally { + writeLock.unlock(); } } @@ -206,8 +186,6 @@ public abstract class ClientConnection implements Co client.connArray[index] = null; //必须connOpenStates之后 } else if (connEntry != null) { //index=-1 connEntry.connOpenState.set(false); - } else {//index=-2 - client.localConnList.remove(this); } }