WebClient优化

This commit is contained in:
redkale
2024-08-17 16:13:14 +08:00
parent 6f9d851965
commit 9137aed8db
4 changed files with 191 additions and 412 deletions

View File

@@ -438,7 +438,6 @@ public class HttpRequest extends Request<HttpContext> {
return 0;
}
// TODO 待实现
private int readChunkedBody(final ByteBuffer buf) {
final ByteBuffer buffer = buf;
int remain = buffer.remaining();
@@ -537,7 +536,7 @@ public class HttpRequest extends Request<HttpContext> {
}
}
private static int parseHexLength(ByteArray input) {
static int parseHexLength(ByteArray input) {
int count = input.length();
int len = 0;
for (int i = 0; i < count; i++) {

View File

@@ -7,8 +7,6 @@ package org.redkale.net.http;
import java.lang.reflect.Type;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.concurrent.*;
import org.redkale.convert.Convert;
@@ -17,7 +15,6 @@ import org.redkale.net.*;
import org.redkale.net.client.Client;
import org.redkale.net.client.ClientAddress;
import org.redkale.net.client.ClientConnection;
import static org.redkale.net.http.HttpRequest.parseHeaderName;
import org.redkale.util.*;
/**
@@ -237,8 +234,8 @@ public class WebClient extends Client<WebConnection, WebRequest, WebResult> {
public <T> CompletableFuture<HttpResult<T>> sendAsync(
String method, String url, HttpHeaders headers, byte[] body, Convert convert, Type valueType) {
final String traceid = Traces.computeIfAbsent(Traces.currentTraceid());
final WorkThread workThread = WorkThread.currentWorkThread();
// final String traceid = Traces.computeIfAbsent(Traces.currentTraceid());
// final WorkThread workThread = WorkThread.currentWorkThread();
if (method.indexOf(' ') >= 0 || method.indexOf('\r') >= 0 || method.indexOf('\n') >= 0) {
throw new RedkaleException("http-method(" + method + ") is illegal");
}
@@ -262,408 +259,6 @@ public class WebClient extends Client<WebConnection, WebRequest, WebResult> {
}
});
}
// 以下代码暂废弃
final ByteArray array = new ByteArray();
array.put((method.toUpperCase() + " " + path + " HTTP/1.1\r\n").getBytes(StandardCharsets.UTF_8));
array.put(("Host: " + uri.getHost() + "\r\n").getBytes(StandardCharsets.UTF_8));
array.put(WebRequest.contentLengthBytes(body));
if (headers == null || !headers.contains("User-Agent")) {
array.put(header_bytes_useragent);
}
array.put(header_bytes_connclose);
if (headers == null || !headers.contains(Rest.REST_HEADER_TRACEID)) {
array.put((Rest.REST_HEADER_TRACEID + ": " + traceid + "\r\n").getBytes(StandardCharsets.UTF_8));
}
if (headers != null) {
headers.forEach(
k -> !k.equalsIgnoreCase("Connection") && !k.equalsIgnoreCase("Content-Length"),
(k, v) -> array.put((k + ": " + v + "\r\n").getBytes(StandardCharsets.UTF_8)));
}
array.put((byte) '\r', (byte) '\n');
if (body != null) {
array.put(body);
}
return createConnection(host, port).thenCompose(conn -> {
Traces.currentTraceid(traceid);
final CompletableFuture<HttpResult<T>> future = new CompletableFuture();
conn.write(array, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
conn.readInIOThread(new ClientReadCompletionHandler(
conn, workThread, traceid, array.clear(), convert, valueType, future));
}
@Override
public void failed(Throwable exc, Void attachment) {
Traces.currentTraceid(traceid);
conn.dispose();
if (workThread == null) {
Utility.execute(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(exc);
});
} else {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(exc);
});
}
}
});
return future;
});
}
protected CompletableFuture<HttpConnection> createConnection(String host, int port) {
return asyncGroup
.createTCPClient(new InetSocketAddress(host, port), connectTimeoutSeconds)
.thenApply(conn -> new HttpConnection(conn));
}
//
// public static void main(String[] args) throws Throwable {
// final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
// asyncGroup.start();
// String url = "http://redkale.org";
// WebClient client = WebClient.createPostPath(asyncGroup);
// (System.out).println(client.getAsync(url).join());
// }
//
protected static class HttpConnection {
protected final AsyncConnection channel;
public HttpConnection(AsyncConnection channel) {
this.channel = channel;
}
public void dispose() {
this.channel.dispose();
}
public void setReadBuffer(ByteBuffer buffer) {
this.channel.setReadBuffer(buffer);
}
public void offerReadBuffer(ByteBuffer buffer) {
this.channel.offerReadBuffer(buffer);
}
public void read(CompletionHandler<Integer, ByteBuffer> handler) {
this.channel.read(handler);
}
public void readInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
this.channel.readInIOThread(handler);
}
public void write(ByteTuple array, CompletionHandler<Integer, Void> handler) {
this.channel.write(array, handler);
}
}
protected class ClientReadCompletionHandler<T> implements CompletionHandler<Integer, ByteBuffer> {
protected static final int READ_STATE_ROUTE = 1;
protected static final int READ_STATE_HEADER = 2;
protected static final int READ_STATE_BODY = 3;
protected static final int READ_STATE_END = 4;
protected final HttpConnection conn;
protected final ByteArray array;
protected final String traceid;
protected final WorkThread workThread;
protected final CompletableFuture<HttpResult<T>> future;
protected Convert convert;
protected Type valueType;
protected HttpResult<byte[]> responseResult;
protected int readState = READ_STATE_ROUTE;
protected int contentLength = -1;
public ClientReadCompletionHandler(
HttpConnection conn,
WorkThread workThread,
String traceid,
ByteArray array,
Convert convert,
Type valueType,
CompletableFuture<HttpResult<T>> future) {
this.conn = conn;
this.workThread = workThread;
this.traceid = traceid;
this.array = array;
this.convert = convert;
this.valueType = valueType;
this.future = future;
}
@Override
public void completed(Integer count, ByteBuffer buffer) {
Traces.currentTraceid(traceid);
buffer.flip();
if (this.readState == READ_STATE_ROUTE) {
if (this.responseResult == null) {
this.responseResult = new HttpResult<>();
}
int rs = readStatusLine(buffer);
if (rs != 0) {
buffer.clear();
conn.setReadBuffer(buffer);
conn.read(this);
return;
}
this.readState = READ_STATE_HEADER;
}
if (this.readState == READ_STATE_HEADER) {
int rs = readHeaderLines(buffer);
if (rs != 0) {
buffer.clear();
conn.setReadBuffer(buffer);
conn.read(this);
return;
}
this.readState = READ_STATE_BODY;
}
if (this.readState == READ_STATE_BODY) {
if (this.contentLength > 0) {
array.put(buffer, Math.min(this.contentLength, buffer.remaining()));
int lr = (int) this.contentLength - array.length();
if (lr == 0) {
this.readState = READ_STATE_END;
} else {
buffer.clear();
conn.setReadBuffer(buffer);
conn.read(this);
return;
}
}
if (buffer.hasRemaining()) {
array.put(buffer, buffer.remaining());
}
this.readState = READ_STATE_END;
}
if (responseResult.getStatus() <= 200) {
this.responseResult.setResult(array.getBytes());
}
conn.offerReadBuffer(buffer);
conn.dispose();
if (this.responseResult != null && valueType != null) {
Convert c = convert == null ? JsonConvert.root() : convert;
HttpResult result = this.responseResult;
try {
result.result(c.convertFrom(valueType, this.responseResult.getResult()));
if (workThread != null && workThread.getState() == Thread.State.RUNNABLE) {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
} else if (workExecutor != null) {
workExecutor.execute(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
} else {
Utility.execute(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
}
} catch (Exception e) {
if (workThread != null && workThread.getState() == Thread.State.RUNNABLE) {
workThread.execute(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(e);
});
} else if (workExecutor != null) {
workExecutor.execute(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(e);
});
} else {
Utility.execute(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(e);
});
}
}
} else {
if (workThread != null && workThread.getState() == Thread.State.RUNNABLE) {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
} else if (workExecutor != null) {
workExecutor.execute(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
} else {
Utility.execute(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
}
}
}
// 解析 HTTP/1.1 200 OK
private int readStatusLine(final ByteBuffer buffer) {
int remain = buffer.remaining();
ByteArray bytes = array;
for (; ; ) {
if (remain-- < 1) {
buffer.clear();
return 1;
}
byte b = buffer.get();
if (b == '\r') {
if (remain-- < 1) {
buffer.clear();
buffer.put((byte) '\r');
return 1;
}
if (buffer.get() != '\n') {
return -1;
}
break;
}
bytes.put(b);
}
String value = bytes.toString(null);
int pos = value.indexOf(' ');
this.responseResult.setStatus(Integer.decode(value.substring(pos + 1, value.indexOf(" ", pos + 2))));
bytes.clear();
return 0;
}
// 解析Header Connection: keep-alive
// 返回0表示解析完整非0表示还需继续读数据
private int readHeaderLines(final ByteBuffer buffer) {
int remain = buffer.remaining();
ByteArray bytes = array;
HttpResult<byte[]> result = responseResult;
for (; ; ) {
bytes.clear();
if (remain-- < 2) {
if (remain == 1) {
byte one = buffer.get();
buffer.clear();
buffer.put(one);
return 1;
}
buffer.clear();
return 1;
}
remain--;
byte b1 = buffer.get();
byte b2 = buffer.get();
if (b1 == '\r' && b2 == '\n') {
return 0;
}
boolean latin1 = true;
if (latin1 && (b1 < 0x20 || b1 >= 0x80)) {
latin1 = false;
}
if (latin1 && (b2 < 0x20 || b2 >= 0x80)) {
latin1 = false;
}
bytes.put(b1, b2);
for (; ; ) { // name
if (remain-- < 1) {
buffer.clear();
buffer.put(bytes.content(), 0, bytes.length());
return 1;
}
byte b = buffer.get();
if (b == ':') {
break;
} else if (latin1 && (b < 0x20 || b >= 0x80)) {
latin1 = false;
}
bytes.put(b);
}
String name = parseHeaderName(latin1, bytes, null);
bytes.clear();
boolean first = true;
int space = 0;
for (; ; ) { // value
if (remain-- < 1) {
buffer.clear();
buffer.put(name.getBytes());
buffer.put((byte) ':');
if (space == 1) {
buffer.put((byte) ' ');
} else if (space > 0) {
for (int i = 0; i < space; i++) buffer.put((byte) ' ');
}
buffer.put(bytes.content(), 0, bytes.length());
return 1;
}
byte b = buffer.get();
if (b == '\r') {
if (remain-- < 1) {
buffer.clear();
buffer.put(name.getBytes());
buffer.put((byte) ':');
if (space == 1) {
buffer.put((byte) ' ');
} else if (space > 0) {
for (int i = 0; i < space; i++) buffer.put((byte) ' ');
}
buffer.put(bytes.content(), 0, bytes.length());
buffer.put((byte) '\r');
return 1;
}
if (buffer.get() != '\n') {
return -1;
}
break;
}
if (first) {
if (b <= ' ') {
space++;
continue;
}
first = false;
}
bytes.put(b);
}
String value;
switch (name) {
case "Content-Length":
case "content-length":
value = bytes.toString(true, null);
this.contentLength = Integer.decode(value);
result.header(name, value);
break;
default:
value = bytes.toString(null);
result.header(name, value);
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
conn.offerReadBuffer(attachment);
conn.dispose();
future.completeExceptionally(exc);
}
return CompletableFuture.failedFuture(new RedkaleException("Not supported https"));
}
}

View File

@@ -3,10 +3,14 @@
*/
package org.redkale.net.http;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;
import java.util.zip.Inflater;
import org.redkale.net.client.ClientCodec;
import static org.redkale.net.http.HttpRequest.*;
import org.redkale.util.ByteArray;
@@ -158,17 +162,71 @@ class WebCodec extends ClientCodec<WebRequest, WebResult> {
}
private int readBody(final WebResult result, final ByteBuffer buffer, final ByteArray array) {
if (result.contentLength >= 0) {
if (result.chunked) {
if (result.chunkBody == null) {
result.chunkBody = new ByteArray();
}
int rs = result.readChunkedBody(buffer);
if (rs == 0) {
rs = unzipEncoding(result, result.chunkBody);
result.result(result.chunkBody.getBytes());
result.array = null;
result.chunkBody = null;
}
return rs;
} else if (result.contentLength >= 0) {
array.put(buffer, Math.min((int) result.contentLength, buffer.remaining()));
int lr = (int) result.contentLength - array.length();
if (lr == 0) {
lr = unzipEncoding(result, array);
result.result(array.getBytes());
if (lr < 0) {
return lr;
}
}
return lr > 0 ? lr : 0;
}
return -1;
}
private int unzipEncoding(final WebResult result, ByteArray body) {
if (result.contentEncoding != null) {
try {
if ("gzip".equalsIgnoreCase(result.contentEncoding)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayInputStream in = new ByteArrayInputStream(body.content(), 0, body.length());
GZIPInputStream ungzip = new GZIPInputStream(in);
int n;
byte[] buffer = result.array().content();
while ((n = ungzip.read(buffer)) > 0) {
out.write(buffer, 0, n);
}
body.clear();
body.put(out.toByteArray());
} else if ("deflate".equalsIgnoreCase(result.contentEncoding)) {
Inflater infl = new Inflater();
infl.setInput(body.content(), 0, body.length());
ByteArrayOutputStream out = new ByteArrayOutputStream();
int n;
byte[] buffer = result.array().content();
while (!infl.finished()) {
n = infl.inflate(buffer);
if (n == 0) {
break;
}
out.write(buffer, 0, n);
}
infl.end();
body.clear();
body.put(out.toByteArray());
}
} catch (Exception e) {
return -1;
}
}
return 0;
}
private void readHeaderLines(final WebResult result, ByteArray bytes) {
int start = 0;
int posC, posR;
@@ -179,8 +237,12 @@ class WebCodec extends ClientCodec<WebRequest, WebResult> {
posR = bytes.indexOf(posC + 1, '\r');
String value = bytes.toString(posC + 1, posR - posC - 1, charset).trim();
result.header(name, value);
if ("Content-Length".equalsIgnoreCase(name)) {
if (HEAD_CONTENT_LENGTH.equalsIgnoreCase(name)) {
result.contentLength = Integer.parseInt(value);
} else if (HEAD_CONTENT_ENCODING.equalsIgnoreCase(name)) {
result.contentEncoding = value;
} else if (HEAD_TRANSFER_ENCODING.equalsIgnoreCase(name)) {
result.chunked = "chunked".equalsIgnoreCase(value);
}
start = posR + 2; // 跳过\r\n
}

View File

@@ -3,9 +3,13 @@
*/
package org.redkale.net.http;
import java.nio.ByteBuffer;
import org.redkale.convert.ConvertDisabled;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.client.ClientResult;
import static org.redkale.net.http.HttpRequest.READ_STATE_END;
import org.redkale.util.ByteArray;
import org.redkale.util.RedkaleException;
/**
* 详情见: https://redkale.org
@@ -24,6 +28,23 @@ public class WebResult<T> extends HttpResult<T> implements ClientResult {
int contentLength = -1;
String contentEncoding;
ByteArray array;
ByteArray chunkBody;
boolean chunked;
// 是否已读\r
boolean chunkedCR = false;
int chunkedLength = -1;
int chunkedCurrOffset = -1;
byte[] chunkedHalfLenBytes;
@Override
@ConvertDisabled
public boolean isKeepAlive() {
@@ -34,4 +55,106 @@ public class WebResult<T> extends HttpResult<T> implements ClientResult {
public String toString() {
return JsonConvert.root().convertTo(HttpResult.class, this);
}
ByteArray array() {
if (array == null) {
array = new ByteArray();
}
return array;
}
int readChunkedBody(final ByteBuffer buf) {
final ByteBuffer buffer = buf;
int remain = buffer.remaining();
if (this.chunkedLength < 0) { // 需要读取length
ByteArray input = array();
input.clear();
if (this.chunkedHalfLenBytes != null) {
input.put(this.chunkedHalfLenBytes);
this.chunkedHalfLenBytes = null;
}
for (; ; ) {
if (remain-- < 1) {
buffer.clear();
if (input.length() > 0) {
this.chunkedHalfLenBytes = input.getBytes();
}
return 1;
}
byte b = buffer.get();
if (b == '\n') {
break;
}
input.put(b);
}
this.chunkedLength = HttpRequest.parseHexLength(input);
this.chunkedCurrOffset = 0;
this.chunkedCR = false;
}
if (this.chunkedLength == 0) {
if (remain < 1) {
buffer.clear();
return 1;
}
if (!this.chunkedCR) { // 读\r
remain--;
if (buffer.get() != '\r') {
throw new RedkaleException("invalid chunk end");
}
this.chunkedCR = true;
if (remain < 1) {
buffer.clear();
return 1;
}
}
// 读\n
remain--;
if (buffer.get() != '\n') {
throw new RedkaleException("invalid chunk end");
}
this.readState = READ_STATE_END;
return 0;
} else {
ByteArray bodyBytes = this.chunkBody;
if (this.chunkedCurrOffset < this.chunkedLength) {
for (; ; ) {
if (remain-- < 1) {
buffer.clear();
return 1;
}
byte b = buffer.get();
bodyBytes.put(b);
this.chunkedCurrOffset++;
if (this.chunkedCurrOffset == this.chunkedLength) {
this.chunkedCR = false;
break;
}
}
}
if (remain < 1) {
buffer.clear();
return 1;
}
// 读\r
if (!this.chunkedCR) {
remain--;
if (buffer.get() != '\r') {
throw new RedkaleException("invalid chunk end");
}
this.chunkedCR = true;
if (remain < 1) {
buffer.clear();
return 1;
}
}
// 读\n
remain--;
if (buffer.get() != '\n') {
throw new RedkaleException("invalid chunk end");
}
this.chunkedLength = -1;
// 继续读下一个chunk
return readChunkedBody(buffer);
}
}
}