writeLock

This commit is contained in:
redkale
2024-10-28 09:22:18 +08:00
parent c43d151d4d
commit edfc7b9190
3 changed files with 29 additions and 23 deletions

View File

@@ -53,6 +53,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
protected final int bufferCapacity;
private final ReentrantLock writeLock = new ReentrantLock();
protected AsyncIOThread ioReadThread;
protected AsyncIOThread ioWriteThread;
@@ -65,8 +67,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
private Consumer<ByteBuffer> writeBufferConsumer;
final ReentrantLock pipelineLock = new ReentrantLock();
private ByteBufferWriter pipelineWriter;
private PipelineDataNode pipelineDataNode;
@@ -205,6 +205,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return ioWriteThread;
}
public final void lockWrite() {
writeLock.lock();
}
public final void unlockWrite() {
writeLock.unlock();
}
/**
* 快速发送
*
@@ -615,7 +623,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
// 返回pipelineCount个数数据是否全部写入完毕
public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) {
pipelineLock.lock();
writeLock.lock();
try {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
@@ -645,7 +653,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return false;
}
} finally {
pipelineLock.unlock();
writeLock.unlock();
}
}
@@ -672,7 +680,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
byte[] bodyContent,
int bodyOffset,
int bodyLength) {
pipelineLock.lock();
writeLock.lock();
try {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
@@ -703,7 +711,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return false;
}
} finally {
pipelineLock.unlock();
writeLock.unlock();
}
}

View File

@@ -160,13 +160,13 @@ abstract class AsyncNioConnection extends AsyncConnection {
@Override
public final void pipelineWrite(PipelinePacket... packets) {
if (pipelineWriteQueue == null) {
pipelineLock.lock();
writeLock.lock();
try {
if (pipelineWriteQueue == null) {
pipelineWriteQueue = new ConcurrentLinkedDeque<>();
}
} finally {
pipelineLock.unlock();
writeLock.unlock();
}
}
for (PipelinePacket packet : packets) {
@@ -241,7 +241,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
handler.failed(exc, attachment);
}
};
this.writeCompletionHandler = (CompletionHandler) newHandler;
this.writeCompletionHandler = newHandler;
doWrite(); // 如果不是true则bodyCallback的执行可能会切换线程
}
@@ -260,7 +260,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writePending = true;
this.writeByteBuffer = src;
this.writeAttachment = attachment;
this.writeCompletionHandler = (CompletionHandler) handler;
this.writeCompletionHandler = handler;
doWrite();
}
@@ -282,7 +282,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeBuffersOffset = offset;
this.writeBuffersLength = length;
this.writeAttachment = attachment;
this.writeCompletionHandler = (CompletionHandler) handler;
this.writeCompletionHandler = handler;
doWrite();
}

View File

@@ -12,7 +12,6 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.Level;
import org.redkale.annotation.*;
@@ -38,8 +37,6 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
protected final LongAdder doneResponseCounter = new LongAdder();
protected final ReentrantLock writeLock = new ReentrantLock();
protected final ByteArray writeArray = new ByteArray();
protected final ByteBuffer writeBuffer;
@@ -139,7 +136,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
}
respWaitingCounter.add(respFutures.length); // 放在writeChannelInWriteThread计数会延迟导致不准确
writeLock.lock();
channel.lockWrite();
try {
if (pauseWriting.get()) {
for (ClientFuture respFuture : respFutures) {
@@ -150,15 +147,15 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
for (ClientFuture respFuture : respFutures) {
offerRespFuture(respFuture);
}
sendRequestInLocking(respFutures);
sendRequestInLock(respFutures);
}
} finally {
writeLock.unlock();
channel.unlockWrite();
}
return respFutures;
}
protected void sendRequestInLocking(ClientFuture... respFutures) {
protected void sendRequestInLock(ClientFuture... respFutures) {
sendRequestToChannel(respFutures);
}
@@ -193,7 +190,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
// 发送半包和积压的请求数据包
void sendHalfWriteInReadThread(R halfRequest, Throwable halfException) {
writeLock.lock();
channel.lockWrite();
try {
pauseWriting.set(false);
ClientFuture respFuture = this.currHalfWriteFuture;
@@ -210,7 +207,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
sendRequestToChannel(respFuture);
}
} finally {
writeLock.unlock();
channel.unlockWrite();
}
}
@@ -219,14 +216,15 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
return CompletableFuture.failedFuture(
new RuntimeException("ClientVirtualRequest must be virtualType = true"));
}
AsyncConnection conn = channel;
ClientFuture<R, P> respFuture = client.createClientFuture(this, request);
writeLock.lock();
conn.lockWrite();
try {
offerRespFuture(respFuture);
} finally {
writeLock.unlock();
conn.unlockWrite();
}
channel.readRegister(getCodec()); // 不能在创建连接时注册读事件
conn.readRegister(getCodec()); // 不能在创建连接时注册读事件
return respFuture;
}