HttpResponse

This commit is contained in:
redkale
2024-11-09 23:46:30 +08:00
parent 7a76256317
commit bfb0848fa2
4 changed files with 16 additions and 17 deletions

View File

@@ -349,7 +349,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
write(buffer, null, handler); write(buffer, null, handler);
} }
<A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) { private <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (sslEngine == null) { if (sslEngine == null) {
writeImpl(src, (Consumer) null, attachment, handler); writeImpl(src, (Consumer) null, attachment, handler);
} else { } else {
@@ -368,7 +368,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
<A> void write( private <A> void write(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) { ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (sslEngine == null) { if (sslEngine == null) {
writeImpl(srcs, offset, length, (Consumer) null, attachment, handler); writeImpl(srcs, offset, length, (Consumer) null, attachment, handler);
@@ -388,7 +388,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
void write(byte[] bytes, int offset, int length, Object attachment, CompletionHandler handler) { private void write(byte[] bytes, int offset, int length, Object attachment, CompletionHandler handler) {
final ByteBuffer buffer = sslEngine == null ? pollWriteBuffer() : pollWriteSSLBuffer(); final ByteBuffer buffer = sslEngine == null ? pollWriteBuffer() : pollWriteSSLBuffer();
if (buffer.remaining() >= length) { if (buffer.remaining() >= length) {
buffer.put(bytes, offset, length); buffer.put(bytes, offset, length);

View File

@@ -380,7 +380,7 @@ public final class Transport {
factory.getLogger().log(Level.WARNING, Transport.class.getSimpleName() + " async error", ex); factory.getLogger().log(Level.WARNING, Transport.class.getSimpleName() + " async error", ex);
return; return;
} }
conn.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { conn.writeInIOThread(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void completed(Integer result, ByteBuffer attachment) { public void completed(Integer result, ByteBuffer attachment) {

View File

@@ -353,8 +353,8 @@ public class TransportFactory {
if (node.disabletime < 1) { if (node.disabletime < 1) {
continue; // 可用 continue; // 可用
} }
CompletableFuture<AsyncConnection> future = CompletableFuture<AsyncConnection> future = Utility.orTimeout(
Utility.orTimeout(asyncGroup.createTCPClientConnection(node.address), null, 2, TimeUnit.SECONDS); asyncGroup.createTCPClientConnection(node.address), null, 2, TimeUnit.SECONDS);
future.whenComplete((r, t) -> { future.whenComplete((r, t) -> {
node.disabletime = t == null ? 0 : System.currentTimeMillis(); node.disabletime = t == null ? 0 : System.currentTimeMillis();
if (r != null) { if (r != null) {
@@ -387,7 +387,7 @@ public class TransportFactory {
ByteBuffer sendBuffer = pingBuffer.duplicate(); ByteBuffer sendBuffer = pingBuffer.duplicate();
final AsyncConnection localconn = conn; final AsyncConnection localconn = conn;
final BlockingQueue<AsyncConnection> localqueue = queue; final BlockingQueue<AsyncConnection> localqueue = queue;
localconn.write(sendBuffer, sendBuffer, new CompletionHandler<Integer, ByteBuffer>() { localconn.writeInIOThread(sendBuffer, sendBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void completed(Integer result, ByteBuffer wbuffer) { public void completed(Integer result, ByteBuffer wbuffer) {
localconn.read(new CompletionHandler<Integer, ByteBuffer>() { localconn.read(new CompletionHandler<Integer, ByteBuffer>() {

View File

@@ -1334,23 +1334,22 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} }
this.addHeader("ETag", etag); this.addHeader("ETag", etag);
createHeader(); createHeader();
ByteArray data = headerArray; ByteArray headerData = headerArray;
if (fileBody == null) { if (fileBody != null) { // 一般HttpResourceServlet缓存file内容时fileBody不为空
if (start >= 0) {
headerData.put(fileBody, (int) start, (int) ((len > 0) ? len : fileBody.length() - start));
}
super.finish(false, headerData.content(), 0, headerData.length());
} else {
if (this.recycleListener != null) { if (this.recycleListener != null) {
this.output = file; this.output = file;
} }
finishFile(data, file, start, len); sendFile(headerData, file, start, len);
} else { // 一般HttpResourceServlet缓存file内容时fileBody不为空
if (start >= 0) {
data.put(fileBody, (int) start, (int) ((len > 0) ? len : fileBody.length() - start));
}
super.finish(false, data.content(), 0, data.length());
} }
} }
// offset、length 为 -1 表示输出整个文件 // offset、length 为 -1 表示输出整个文件
private void finishFile(ByteArray headerData, File file, long offset, long length) throws IOException { private void sendFile(ByteArray headerData, File file, long offset, long length) throws IOException {
// this.channel.write(headerData, new TransferFileHandler(file, offset, length));
final Logger logger = context.getLogger(); final Logger logger = context.getLogger();
this.channel.writeInIOThread(headerData, new CompletionHandler<Integer, Void>() { this.channel.writeInIOThread(headerData, new CompletionHandler<Integer, Void>() {