writeInLock

This commit is contained in:
redkale
2024-10-28 12:18:08 +08:00
parent f33ac5f88a
commit 39f743b47d
2 changed files with 91 additions and 0 deletions

View File

@@ -264,6 +264,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
protected abstract <A> void writeImpl(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
public abstract <A> void writeInLock(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler);
public abstract <A> void writeInLock(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
read(handler);
}
@@ -508,6 +513,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
// srcs写完才会回调
public final void writeInIOThread(byte[] bytes, CompletionHandler<Integer, Void> handler) {
if (inCurrWriteThread()) {
write(bytes, handler);
@@ -516,6 +522,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
// srcs写完才会回调
public final void writeInIOThread(ByteTuple array, CompletionHandler<Integer, Void> handler) {
if (inCurrWriteThread()) {
write(array, handler);
@@ -524,6 +531,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
// srcs写完才会回调
public final void writeInIOThread(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) {
if (inCurrWriteThread()) {
write(bytes, offset, length, handler);
@@ -532,6 +540,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
// srcs写完才会回调
public final void writeInIOThread(ByteTuple header, ByteTuple body, CompletionHandler<Integer, Void> handler) {
if (inCurrWriteThread()) {
write(header, body, handler);
@@ -540,6 +549,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
// srcs写完才会回调
public final void writeInIOThread(
byte[] headerContent,
int headerOffset,

View File

@@ -323,6 +323,87 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
}
@Override
public <A> void writeInLock(ByteBuffer buffer, A attachment, CompletionHandler<Integer, ? super A> handler) {
int total = 0;
Exception t = null;
lockWrite();
try {
if (this.writePending) {
handler.failed(new WritePendingException(), attachment);
return;
}
this.writePending = true;
while (buffer.hasRemaining()) { // 必须要将buffer写完为止
int c = implWrite(buffer);
if (c < 0) {
t = new ClosedChannelException();
total = c;
break;
}
total += c;
}
} catch (Exception e) {
t = e;
} finally {
this.writePending = false;
unlockWrite();
}
if (t != null) {
handler.failed(t, attachment);
} else {
handler.completed(total, attachment);
}
}
@Override
public <A> void writeInLock(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
int total = 0;
Exception t = null;
int batchOffset = offset;
int batchLength = length;
ByteBuffer[] batchBuffers = srcs;
lockWrite();
try {
if (this.writePending) {
handler.failed(new WritePendingException(), attachment);
return;
}
this.writePending = true;
boolean hasRemain = true;
while (hasRemain) { // 必须要将buffer写完为止
int c = implWrite(batchBuffers, batchOffset, batchLength);
if (c < 0) {
t = new ClosedChannelException();
total = c;
break;
}
boolean remain = false;
for (int i = 0; i < batchLength; i++) {
if (batchBuffers[batchOffset + i].hasRemaining()) {
remain = true;
batchOffset += i;
batchLength -= i;
break;
}
}
hasRemain = remain;
total += c;
}
} catch (Exception e) {
t = e;
} finally {
this.writePending = false;
unlockWrite();
}
if (t != null) {
handler.failed(t, attachment);
} else {
handler.completed(total, attachment);
}
}
public void doWrite() {
try {
this.writeTime = System.currentTimeMillis();