diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index adcc6a80a..2cb90ccda 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -53,6 +53,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { protected final int bufferCapacity; + private final ReentrantLock writeLock = new ReentrantLock(); + protected AsyncIOThread ioReadThread; protected AsyncIOThread ioWriteThread; @@ -65,8 +67,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { private Consumer writeBufferConsumer; - final ReentrantLock pipelineLock = new ReentrantLock(); - private ByteBufferWriter pipelineWriter; private PipelineDataNode pipelineDataNode; @@ -205,6 +205,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return ioWriteThread; } + public final void lockWrite() { + writeLock.lock(); + } + + public final void unlockWrite() { + writeLock.unlock(); + } + /** * 快速发送 * @@ -615,7 +623,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { // 返回pipelineCount个数数据是否全部写入完毕 public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) { - pipelineLock.lock(); + writeLock.lock(); try { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { @@ -645,7 +653,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return false; } } finally { - pipelineLock.unlock(); + writeLock.unlock(); } } @@ -672,7 +680,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { byte[] bodyContent, int bodyOffset, int bodyLength) { - pipelineLock.lock(); + writeLock.lock(); try { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { @@ -703,7 +711,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return false; } } finally { - pipelineLock.unlock(); + writeLock.unlock(); } } diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 9db399ba7..37c1e8b8a 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -160,13 +160,13 @@ abstract class AsyncNioConnection extends AsyncConnection { @Override public final void pipelineWrite(PipelinePacket... packets) { if (pipelineWriteQueue == null) { - pipelineLock.lock(); + writeLock.lock(); try { if (pipelineWriteQueue == null) { pipelineWriteQueue = new ConcurrentLinkedDeque<>(); } } finally { - pipelineLock.unlock(); + writeLock.unlock(); } } for (PipelinePacket packet : packets) { @@ -241,7 +241,7 @@ abstract class AsyncNioConnection extends AsyncConnection { handler.failed(exc, attachment); } }; - this.writeCompletionHandler = (CompletionHandler) newHandler; + this.writeCompletionHandler = newHandler; doWrite(); // 如果不是true,则bodyCallback的执行可能会切换线程 } @@ -260,7 +260,7 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writePending = true; this.writeByteBuffer = src; this.writeAttachment = attachment; - this.writeCompletionHandler = (CompletionHandler) handler; + this.writeCompletionHandler = handler; doWrite(); } @@ -282,7 +282,7 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeBuffersOffset = offset; this.writeBuffersLength = length; this.writeAttachment = attachment; - this.writeCompletionHandler = (CompletionHandler) handler; + this.writeCompletionHandler = handler; doWrite(); } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 32a702ed9..631fd2017 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -12,7 +12,6 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.Level; import org.redkale.annotation.*; @@ -38,8 +37,6 @@ public abstract class ClientConnection respFuture = client.createClientFuture(this, request); - writeLock.lock(); + conn.lockWrite(); try { offerRespFuture(respFuture); } finally { - writeLock.unlock(); + conn.unlockWrite(); } - channel.readRegister(getCodec()); // 不能在创建连接时注册读事件 + conn.readRegister(getCodec()); // 不能在创建连接时注册读事件 return respFuture; }