diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index e16c9a6a2..052f742a2 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -438,7 +438,6 @@ public class HttpRequest extends Request { return 0; } - // TODO 待实现 private int readChunkedBody(final ByteBuffer buf) { final ByteBuffer buffer = buf; int remain = buffer.remaining(); @@ -537,7 +536,7 @@ public class HttpRequest extends Request { } } - private static int parseHexLength(ByteArray input) { + static int parseHexLength(ByteArray input) { int count = input.length(); int len = 0; for (int i = 0; i < count; i++) { diff --git a/src/main/java/org/redkale/net/http/WebClient.java b/src/main/java/org/redkale/net/http/WebClient.java index 20fc30ce4..32b011ccf 100644 --- a/src/main/java/org/redkale/net/http/WebClient.java +++ b/src/main/java/org/redkale/net/http/WebClient.java @@ -7,8 +7,6 @@ package org.redkale.net.http; import java.lang.reflect.Type; import java.net.*; -import java.nio.*; -import java.nio.channels.*; import java.nio.charset.*; import java.util.concurrent.*; import org.redkale.convert.Convert; @@ -17,7 +15,6 @@ import org.redkale.net.*; import org.redkale.net.client.Client; import org.redkale.net.client.ClientAddress; import org.redkale.net.client.ClientConnection; -import static org.redkale.net.http.HttpRequest.parseHeaderName; import org.redkale.util.*; /** @@ -237,8 +234,8 @@ public class WebClient extends Client { public CompletableFuture> sendAsync( String method, String url, HttpHeaders headers, byte[] body, Convert convert, Type valueType) { - final String traceid = Traces.computeIfAbsent(Traces.currentTraceid()); - final WorkThread workThread = WorkThread.currentWorkThread(); + // final String traceid = Traces.computeIfAbsent(Traces.currentTraceid()); + // final WorkThread workThread = WorkThread.currentWorkThread(); if (method.indexOf(' ') >= 0 || method.indexOf('\r') >= 0 || method.indexOf('\n') >= 0) { throw new RedkaleException("http-method(" + method + ") is illegal"); } @@ -262,408 +259,6 @@ public class WebClient extends Client { } }); } - - // 以下代码暂废弃 - final ByteArray array = new ByteArray(); - array.put((method.toUpperCase() + " " + path + " HTTP/1.1\r\n").getBytes(StandardCharsets.UTF_8)); - array.put(("Host: " + uri.getHost() + "\r\n").getBytes(StandardCharsets.UTF_8)); - - array.put(WebRequest.contentLengthBytes(body)); - if (headers == null || !headers.contains("User-Agent")) { - array.put(header_bytes_useragent); - } - array.put(header_bytes_connclose); - if (headers == null || !headers.contains(Rest.REST_HEADER_TRACEID)) { - array.put((Rest.REST_HEADER_TRACEID + ": " + traceid + "\r\n").getBytes(StandardCharsets.UTF_8)); - } - if (headers != null) { - headers.forEach( - k -> !k.equalsIgnoreCase("Connection") && !k.equalsIgnoreCase("Content-Length"), - (k, v) -> array.put((k + ": " + v + "\r\n").getBytes(StandardCharsets.UTF_8))); - } - array.put((byte) '\r', (byte) '\n'); - if (body != null) { - array.put(body); - } - - return createConnection(host, port).thenCompose(conn -> { - Traces.currentTraceid(traceid); - final CompletableFuture> future = new CompletableFuture(); - conn.write(array, new CompletionHandler() { - @Override - public void completed(Integer result, Void attachment) { - conn.readInIOThread(new ClientReadCompletionHandler( - conn, workThread, traceid, array.clear(), convert, valueType, future)); - } - - @Override - public void failed(Throwable exc, Void attachment) { - Traces.currentTraceid(traceid); - conn.dispose(); - if (workThread == null) { - Utility.execute(() -> { - Traces.currentTraceid(traceid); - future.completeExceptionally(exc); - }); - } else { - workThread.runWork(() -> { - Traces.currentTraceid(traceid); - future.completeExceptionally(exc); - }); - } - } - }); - return future; - }); - } - - protected CompletableFuture createConnection(String host, int port) { - return asyncGroup - .createTCPClient(new InetSocketAddress(host, port), connectTimeoutSeconds) - .thenApply(conn -> new HttpConnection(conn)); - } - - // - // public static void main(String[] args) throws Throwable { - // final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); - // asyncGroup.start(); - // String url = "http://redkale.org"; - // WebClient client = WebClient.createPostPath(asyncGroup); - // (System.out).println(client.getAsync(url).join()); - // } - // - protected static class HttpConnection { - - protected final AsyncConnection channel; - - public HttpConnection(AsyncConnection channel) { - this.channel = channel; - } - - public void dispose() { - this.channel.dispose(); - } - - public void setReadBuffer(ByteBuffer buffer) { - this.channel.setReadBuffer(buffer); - } - - public void offerReadBuffer(ByteBuffer buffer) { - this.channel.offerReadBuffer(buffer); - } - - public void read(CompletionHandler handler) { - this.channel.read(handler); - } - - public void readInIOThread(CompletionHandler handler) { - this.channel.readInIOThread(handler); - } - - public void write(ByteTuple array, CompletionHandler handler) { - this.channel.write(array, handler); - } - } - - protected class ClientReadCompletionHandler implements CompletionHandler { - - protected static final int READ_STATE_ROUTE = 1; - - protected static final int READ_STATE_HEADER = 2; - - protected static final int READ_STATE_BODY = 3; - - protected static final int READ_STATE_END = 4; - - protected final HttpConnection conn; - - protected final ByteArray array; - - protected final String traceid; - - protected final WorkThread workThread; - - protected final CompletableFuture> future; - - protected Convert convert; - - protected Type valueType; - - protected HttpResult responseResult; - - protected int readState = READ_STATE_ROUTE; - - protected int contentLength = -1; - - public ClientReadCompletionHandler( - HttpConnection conn, - WorkThread workThread, - String traceid, - ByteArray array, - Convert convert, - Type valueType, - CompletableFuture> future) { - this.conn = conn; - this.workThread = workThread; - this.traceid = traceid; - this.array = array; - this.convert = convert; - this.valueType = valueType; - this.future = future; - } - - @Override - public void completed(Integer count, ByteBuffer buffer) { - Traces.currentTraceid(traceid); - buffer.flip(); - if (this.readState == READ_STATE_ROUTE) { - if (this.responseResult == null) { - this.responseResult = new HttpResult<>(); - } - int rs = readStatusLine(buffer); - if (rs != 0) { - buffer.clear(); - conn.setReadBuffer(buffer); - conn.read(this); - return; - } - this.readState = READ_STATE_HEADER; - } - if (this.readState == READ_STATE_HEADER) { - int rs = readHeaderLines(buffer); - if (rs != 0) { - buffer.clear(); - conn.setReadBuffer(buffer); - conn.read(this); - return; - } - this.readState = READ_STATE_BODY; - } - if (this.readState == READ_STATE_BODY) { - if (this.contentLength > 0) { - array.put(buffer, Math.min(this.contentLength, buffer.remaining())); - int lr = (int) this.contentLength - array.length(); - if (lr == 0) { - this.readState = READ_STATE_END; - } else { - buffer.clear(); - conn.setReadBuffer(buffer); - conn.read(this); - return; - } - } - if (buffer.hasRemaining()) { - array.put(buffer, buffer.remaining()); - } - this.readState = READ_STATE_END; - } - if (responseResult.getStatus() <= 200) { - this.responseResult.setResult(array.getBytes()); - } - conn.offerReadBuffer(buffer); - conn.dispose(); - - if (this.responseResult != null && valueType != null) { - Convert c = convert == null ? JsonConvert.root() : convert; - HttpResult result = this.responseResult; - try { - result.result(c.convertFrom(valueType, this.responseResult.getResult())); - if (workThread != null && workThread.getState() == Thread.State.RUNNABLE) { - workThread.runWork(() -> { - Traces.currentTraceid(traceid); - future.complete((HttpResult) this.responseResult); - }); - } else if (workExecutor != null) { - workExecutor.execute(() -> { - Traces.currentTraceid(traceid); - future.complete((HttpResult) this.responseResult); - }); - } else { - Utility.execute(() -> { - Traces.currentTraceid(traceid); - future.complete((HttpResult) this.responseResult); - }); - } - } catch (Exception e) { - if (workThread != null && workThread.getState() == Thread.State.RUNNABLE) { - workThread.execute(() -> { - Traces.currentTraceid(traceid); - future.completeExceptionally(e); - }); - } else if (workExecutor != null) { - workExecutor.execute(() -> { - Traces.currentTraceid(traceid); - future.completeExceptionally(e); - }); - } else { - Utility.execute(() -> { - Traces.currentTraceid(traceid); - future.completeExceptionally(e); - }); - } - } - } else { - if (workThread != null && workThread.getState() == Thread.State.RUNNABLE) { - workThread.runWork(() -> { - Traces.currentTraceid(traceid); - future.complete((HttpResult) this.responseResult); - }); - } else if (workExecutor != null) { - workExecutor.execute(() -> { - Traces.currentTraceid(traceid); - future.complete((HttpResult) this.responseResult); - }); - } else { - Utility.execute(() -> { - Traces.currentTraceid(traceid); - future.complete((HttpResult) this.responseResult); - }); - } - } - } - - // 解析 HTTP/1.1 200 OK - private int readStatusLine(final ByteBuffer buffer) { - int remain = buffer.remaining(); - ByteArray bytes = array; - for (; ; ) { - if (remain-- < 1) { - buffer.clear(); - return 1; - } - byte b = buffer.get(); - if (b == '\r') { - if (remain-- < 1) { - buffer.clear(); - buffer.put((byte) '\r'); - return 1; - } - if (buffer.get() != '\n') { - return -1; - } - break; - } - bytes.put(b); - } - String value = bytes.toString(null); - int pos = value.indexOf(' '); - this.responseResult.setStatus(Integer.decode(value.substring(pos + 1, value.indexOf(" ", pos + 2)))); - bytes.clear(); - return 0; - } - - // 解析Header Connection: keep-alive - // 返回0表示解析完整,非0表示还需继续读数据 - private int readHeaderLines(final ByteBuffer buffer) { - int remain = buffer.remaining(); - ByteArray bytes = array; - HttpResult result = responseResult; - for (; ; ) { - bytes.clear(); - if (remain-- < 2) { - if (remain == 1) { - byte one = buffer.get(); - buffer.clear(); - buffer.put(one); - return 1; - } - buffer.clear(); - return 1; - } - remain--; - byte b1 = buffer.get(); - byte b2 = buffer.get(); - if (b1 == '\r' && b2 == '\n') { - return 0; - } - boolean latin1 = true; - if (latin1 && (b1 < 0x20 || b1 >= 0x80)) { - latin1 = false; - } - if (latin1 && (b2 < 0x20 || b2 >= 0x80)) { - latin1 = false; - } - bytes.put(b1, b2); - for (; ; ) { // name - if (remain-- < 1) { - buffer.clear(); - buffer.put(bytes.content(), 0, bytes.length()); - return 1; - } - byte b = buffer.get(); - if (b == ':') { - break; - } else if (latin1 && (b < 0x20 || b >= 0x80)) { - latin1 = false; - } - bytes.put(b); - } - String name = parseHeaderName(latin1, bytes, null); - bytes.clear(); - boolean first = true; - int space = 0; - for (; ; ) { // value - if (remain-- < 1) { - buffer.clear(); - buffer.put(name.getBytes()); - buffer.put((byte) ':'); - if (space == 1) { - buffer.put((byte) ' '); - } else if (space > 0) { - for (int i = 0; i < space; i++) buffer.put((byte) ' '); - } - buffer.put(bytes.content(), 0, bytes.length()); - return 1; - } - byte b = buffer.get(); - if (b == '\r') { - if (remain-- < 1) { - buffer.clear(); - buffer.put(name.getBytes()); - buffer.put((byte) ':'); - if (space == 1) { - buffer.put((byte) ' '); - } else if (space > 0) { - for (int i = 0; i < space; i++) buffer.put((byte) ' '); - } - buffer.put(bytes.content(), 0, bytes.length()); - buffer.put((byte) '\r'); - return 1; - } - if (buffer.get() != '\n') { - return -1; - } - break; - } - if (first) { - if (b <= ' ') { - space++; - continue; - } - first = false; - } - bytes.put(b); - } - String value; - switch (name) { - case "Content-Length": - case "content-length": - value = bytes.toString(true, null); - this.contentLength = Integer.decode(value); - result.header(name, value); - break; - default: - value = bytes.toString(null); - result.header(name, value); - } - } - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - conn.offerReadBuffer(attachment); - conn.dispose(); - future.completeExceptionally(exc); - } + return CompletableFuture.failedFuture(new RedkaleException("Not supported https")); } } diff --git a/src/main/java/org/redkale/net/http/WebCodec.java b/src/main/java/org/redkale/net/http/WebCodec.java index 24ab5dab5..4e8cc722e 100644 --- a/src/main/java/org/redkale/net/http/WebCodec.java +++ b/src/main/java/org/redkale/net/http/WebCodec.java @@ -3,10 +3,14 @@ */ package org.redkale.net.http; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.logging.Logger; +import java.util.zip.GZIPInputStream; +import java.util.zip.Inflater; import org.redkale.net.client.ClientCodec; import static org.redkale.net.http.HttpRequest.*; import org.redkale.util.ByteArray; @@ -158,17 +162,71 @@ class WebCodec extends ClientCodec { } private int readBody(final WebResult result, final ByteBuffer buffer, final ByteArray array) { - if (result.contentLength >= 0) { + if (result.chunked) { + if (result.chunkBody == null) { + result.chunkBody = new ByteArray(); + } + int rs = result.readChunkedBody(buffer); + if (rs == 0) { + rs = unzipEncoding(result, result.chunkBody); + result.result(result.chunkBody.getBytes()); + result.array = null; + result.chunkBody = null; + } + return rs; + } else if (result.contentLength >= 0) { array.put(buffer, Math.min((int) result.contentLength, buffer.remaining())); int lr = (int) result.contentLength - array.length(); if (lr == 0) { + lr = unzipEncoding(result, array); result.result(array.getBytes()); + if (lr < 0) { + return lr; + } } return lr > 0 ? lr : 0; } return -1; } + private int unzipEncoding(final WebResult result, ByteArray body) { + if (result.contentEncoding != null) { + try { + if ("gzip".equalsIgnoreCase(result.contentEncoding)) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayInputStream in = new ByteArrayInputStream(body.content(), 0, body.length()); + GZIPInputStream ungzip = new GZIPInputStream(in); + int n; + byte[] buffer = result.array().content(); + while ((n = ungzip.read(buffer)) > 0) { + out.write(buffer, 0, n); + } + body.clear(); + body.put(out.toByteArray()); + } else if ("deflate".equalsIgnoreCase(result.contentEncoding)) { + Inflater infl = new Inflater(); + infl.setInput(body.content(), 0, body.length()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int n; + byte[] buffer = result.array().content(); + while (!infl.finished()) { + n = infl.inflate(buffer); + if (n == 0) { + break; + } + out.write(buffer, 0, n); + } + infl.end(); + body.clear(); + body.put(out.toByteArray()); + } + } catch (Exception e) { + return -1; + } + } + return 0; + } + private void readHeaderLines(final WebResult result, ByteArray bytes) { int start = 0; int posC, posR; @@ -179,8 +237,12 @@ class WebCodec extends ClientCodec { posR = bytes.indexOf(posC + 1, '\r'); String value = bytes.toString(posC + 1, posR - posC - 1, charset).trim(); result.header(name, value); - if ("Content-Length".equalsIgnoreCase(name)) { + if (HEAD_CONTENT_LENGTH.equalsIgnoreCase(name)) { result.contentLength = Integer.parseInt(value); + } else if (HEAD_CONTENT_ENCODING.equalsIgnoreCase(name)) { + result.contentEncoding = value; + } else if (HEAD_TRANSFER_ENCODING.equalsIgnoreCase(name)) { + result.chunked = "chunked".equalsIgnoreCase(value); } start = posR + 2; // 跳过\r\n } diff --git a/src/main/java/org/redkale/net/http/WebResult.java b/src/main/java/org/redkale/net/http/WebResult.java index 56e5079ba..db8d326d4 100644 --- a/src/main/java/org/redkale/net/http/WebResult.java +++ b/src/main/java/org/redkale/net/http/WebResult.java @@ -3,9 +3,13 @@ */ package org.redkale.net.http; +import java.nio.ByteBuffer; import org.redkale.convert.ConvertDisabled; import org.redkale.convert.json.JsonConvert; import org.redkale.net.client.ClientResult; +import static org.redkale.net.http.HttpRequest.READ_STATE_END; +import org.redkale.util.ByteArray; +import org.redkale.util.RedkaleException; /** * 详情见: https://redkale.org @@ -24,6 +28,23 @@ public class WebResult extends HttpResult implements ClientResult { int contentLength = -1; + String contentEncoding; + + ByteArray array; + + ByteArray chunkBody; + + boolean chunked; + + // 是否已读\r + boolean chunkedCR = false; + + int chunkedLength = -1; + + int chunkedCurrOffset = -1; + + byte[] chunkedHalfLenBytes; + @Override @ConvertDisabled public boolean isKeepAlive() { @@ -34,4 +55,106 @@ public class WebResult extends HttpResult implements ClientResult { public String toString() { return JsonConvert.root().convertTo(HttpResult.class, this); } + + ByteArray array() { + if (array == null) { + array = new ByteArray(); + } + return array; + } + + int readChunkedBody(final ByteBuffer buf) { + final ByteBuffer buffer = buf; + int remain = buffer.remaining(); + if (this.chunkedLength < 0) { // 需要读取length + ByteArray input = array(); + input.clear(); + if (this.chunkedHalfLenBytes != null) { + input.put(this.chunkedHalfLenBytes); + this.chunkedHalfLenBytes = null; + } + for (; ; ) { + if (remain-- < 1) { + buffer.clear(); + if (input.length() > 0) { + this.chunkedHalfLenBytes = input.getBytes(); + } + return 1; + } + byte b = buffer.get(); + if (b == '\n') { + break; + } + input.put(b); + } + this.chunkedLength = HttpRequest.parseHexLength(input); + this.chunkedCurrOffset = 0; + this.chunkedCR = false; + } + if (this.chunkedLength == 0) { + if (remain < 1) { + buffer.clear(); + return 1; + } + if (!this.chunkedCR) { // 读\r + remain--; + if (buffer.get() != '\r') { + throw new RedkaleException("invalid chunk end"); + } + this.chunkedCR = true; + if (remain < 1) { + buffer.clear(); + return 1; + } + } + // 读\n + remain--; + if (buffer.get() != '\n') { + throw new RedkaleException("invalid chunk end"); + } + this.readState = READ_STATE_END; + return 0; + } else { + ByteArray bodyBytes = this.chunkBody; + if (this.chunkedCurrOffset < this.chunkedLength) { + for (; ; ) { + if (remain-- < 1) { + buffer.clear(); + return 1; + } + byte b = buffer.get(); + bodyBytes.put(b); + this.chunkedCurrOffset++; + if (this.chunkedCurrOffset == this.chunkedLength) { + this.chunkedCR = false; + break; + } + } + } + if (remain < 1) { + buffer.clear(); + return 1; + } + // 读\r + if (!this.chunkedCR) { + remain--; + if (buffer.get() != '\r') { + throw new RedkaleException("invalid chunk end"); + } + this.chunkedCR = true; + if (remain < 1) { + buffer.clear(); + return 1; + } + } + // 读\n + remain--; + if (buffer.get() != '\n') { + throw new RedkaleException("invalid chunk end"); + } + this.chunkedLength = -1; + // 继续读下一个chunk + return readChunkedBody(buffer); + } + } }