AsyncConnection

This commit is contained in:
redkale
2024-10-31 21:33:42 +08:00
parent 0895c82525
commit 9d691e50f7
4 changed files with 66 additions and 27 deletions

View File

@@ -269,6 +269,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public abstract <A> void writeInLock(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
public abstract <A> void writeInLock(
Supplier<ByteBuffer> supplier, A attachment, CompletionHandler<Integer, ? super A> handler);
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
read(handler);
}
@@ -305,8 +308,16 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
public final void write(ByteTuple array, CompletionHandler<Integer, Void> handler) {
write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, handler);
}
public final void write(ByteBuffer buffer, CompletionHandler<Integer, Void> handler) {
write(buffer, null, handler);
}
// src写完才会回调
public final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (sslEngine == null) {
writeImpl(src, attachment, handler);
} else {
@@ -325,7 +336,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
public final <A> void write(
final <A> void write(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (sslEngine == null) {
writeImpl(srcs, offset, length, attachment, handler);
@@ -345,31 +356,19 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
public final <A> void write(ByteBuffer[] srcs, A attachment, CompletionHandler<Integer, ? super A> handler) {
final <A> void write(ByteBuffer[] srcs, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(srcs, 0, srcs.length, attachment, handler);
}
public final void write(byte[] bytes, CompletionHandler<Integer, Void> handler) {
final void write(byte[] bytes, CompletionHandler<Integer, Void> handler) {
write(bytes, 0, bytes.length, (byte[]) null, 0, 0, handler);
}
public final <A> void write(byte[] bytes, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(bytes, 0, bytes.length, (byte[]) null, 0, 0, attachment, handler);
}
public final void write(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) {
final void write(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) {
write(bytes, offset, length, (byte[]) null, 0, 0, handler);
}
public final void write(ByteTuple array, CompletionHandler<Integer, Void> handler) {
write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, handler);
}
public final <A> void write(ByteTuple array, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, attachment, handler);
}
public final void write(ByteTuple header, ByteTuple body, CompletionHandler<Integer, Void> handler) {
final void write(ByteTuple header, ByteTuple body, CompletionHandler<Integer, Void> handler) {
write(
header.content(),
header.offset(),
@@ -380,7 +379,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
handler);
}
public void write(
void write(
byte[] headerContent,
int headerOffset,
int headerLength,
@@ -391,7 +390,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, null, handler);
}
public void write(
void write(
byte[] headerContent,
int headerOffset,
int headerLength,

View File

@@ -12,6 +12,7 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import org.redkale.util.ByteBufferWriter;
@@ -404,6 +405,45 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
}
@Override
public <A> void writeInLock(
Supplier<ByteBuffer> supplier, A attachment, CompletionHandler<Integer, ? super A> handler) {
int total = 0;
Exception t = null;
lockWrite();
try {
ByteBuffer buffer = supplier.get();
if (buffer == null || !buffer.hasRemaining()) {
handler.completed(total, attachment);
return;
}
if (this.writePending) {
handler.failed(new WritePendingException(), attachment);
return;
}
this.writePending = true;
while (buffer.hasRemaining()) { // 必须要将buffer写完为止
int c = implWrite(buffer);
if (c < 0) {
t = new ClosedChannelException();
total = c;
break;
}
total += c;
}
} catch (Exception e) {
t = e;
} finally {
this.writePending = false;
unlockWrite();
}
if (t != null) {
handler.failed(t, attachment);
} else {
handler.completed(total, attachment);
}
}
public void doWrite() {
try {
this.writeTime = System.currentTimeMillis();

View File

@@ -43,15 +43,15 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
protected final AsyncConnection channel;
protected final CompletionHandler<Integer, Object> writeHandler = new CompletionHandler<Integer, Object>() {
protected final CompletionHandler<Integer, Void> writeHandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Object attachment) {
public void completed(Integer result, Void attachment) {
// do nothing
}
@Override
public void failed(Throwable exc, Object attachment) {
public void failed(Throwable exc, Void attachment) {
dispose(exc);
}
};
@@ -181,9 +181,9 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
channel.write(writeBuffer, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
channel.write(writeArray, writeHandler);
}
}
}

View File

@@ -1366,7 +1366,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private void finishFile(ByteArray headerData, File file, long offset, long length) throws IOException {
// this.channel.write(headerData, new TransferFileHandler(file, offset, length));
final Logger logger = context.getLogger();
this.channel.write(headerData, new CompletionHandler<Integer, Void>() {
this.channel.writeInIOThread(headerData, new CompletionHandler<Integer, Void>() {
FileChannel fileChannel;
@@ -1413,7 +1413,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} else {
sends += len;
}
channel.write(buffer, attachment, this);
channel.writeInIOThread(buffer, attachment, this);
} catch (Exception e) {
if (fileChannel != null) {
try {