From 252a45fbb429c9c61438ca724936b84bd36f1342 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 27 Nov 2023 19:21:58 +0800 Subject: [PATCH] =?UTF-8?q?HttpSimpleClient=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redkale/cluster/HttpClusterRpcClient.java | 2 +- .../redkale/net/http/HttpResourceServlet.java | 4 +- .../java/org/redkale/net/http/HttpServer.java | 2 - .../redkale/net/http/HttpSimpleClient.java | 8 +- .../org/redkale/net/http/HttpSimpleCodec.java | 155 ++++++------------ .../redkale/net/http/HttpSimpleRequest.java | 38 ++++- .../redkale/net/http/HttpSimpleResult.java | 16 +- .../org/redkale/net/http/MultiContext.java | 16 +- .../org/redkale/net/http/WebSocketNode.java | 2 +- src/main/java/org/redkale/util/Utility.java | 4 +- .../test/http/HttpSimpleClientTest.java | 117 +++++++++++++ .../redkale/test/http/HttpSimpleServlet.java | 23 +++ .../java/org/redkale/test/sncp/SncpTest.java | 4 +- 13 files changed, 249 insertions(+), 142 deletions(-) create mode 100644 src/test/java/org/redkale/test/http/HttpSimpleClientTest.java create mode 100644 src/test/java/org/redkale/test/http/HttpSimpleServlet.java diff --git a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java index ceba4b439..3a11043f8 100644 --- a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java @@ -153,7 +153,7 @@ public class HttpClusterRpcClient extends HttpRpcClient { return new HttpResult().status(404).toFuture(); } InetSocketAddress addr = it.next(); - String host = addr.getPort() != 80 ? addr.getHostString() : (addr.getHostString() + ":" + addr.getPort()); + String host = addr.getPort() > 0 && addr.getPort() != 80 ? (addr.getHostString() + ":" + addr.getPort()) : addr.getHostString(); String url = "http://" + host + requestPath; if (logger.isLoggable(Level.FINER)) { logger.log(Level.FINER, "sendEachAddressAsync: url: " + url diff --git a/src/main/java/org/redkale/net/http/HttpResourceServlet.java b/src/main/java/org/redkale/net/http/HttpResourceServlet.java index 168d24fd9..87840e446 100644 --- a/src/main/java/org/redkale/net/http/HttpResourceServlet.java +++ b/src/main/java/org/redkale/net/http/HttpResourceServlet.java @@ -344,15 +344,13 @@ public class HttpResourceServlet extends HttpServlet { if (this.servlet.cachedLength.longValue() + length > this.servlet.cachelimit) { return; //超过缓存总容量 } - try { - FileInputStream in = new FileInputStream(file); + try (FileInputStream in = new FileInputStream(file)) { ByteArray out = new ByteArray((int) file.length()); byte[] bytes = new byte[10240]; int pos; while ((pos = in.read(bytes)) != -1) { out.put(bytes, 0, pos); } - in.close(); this.content = out; this.servlet.cachedLength.add(this.content.length()); } catch (Exception e) { diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index 642b0df68..ea1052c91 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -41,8 +41,6 @@ public class HttpServer extends Server APP_EXECUTOR资源为null diff --git a/src/main/java/org/redkale/net/http/HttpSimpleClient.java b/src/main/java/org/redkale/net/http/HttpSimpleClient.java index 9a0a316e8..d79dfd42f 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleClient.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleClient.java @@ -271,7 +271,7 @@ public class HttpSimpleClient extends Client() { @Override public void completed(Integer result, Void attachment) { - conn.read(new ClientReadCompletionHandler(conn, workThread, traceid, array.clear(), convert, valueType, future)); + conn.readInIOThread(new ClientReadCompletionHandler(conn, workThread, traceid, array.clear(), convert, valueType, future)); } @Override @@ -306,7 +306,7 @@ public class HttpSimpleClient extends Client handler) { + this.channel.readInIOThread(handler); + } + public void write(ByteTuple array, CompletionHandler handler) { this.channel.write(array, handler); } diff --git a/src/main/java/org/redkale/net/http/HttpSimpleCodec.java b/src/main/java/org/redkale/net/http/HttpSimpleCodec.java index e06ae0978..96d829248 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleCodec.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleCodec.java @@ -4,6 +4,8 @@ package org.redkale.net.http; import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.logging.Logger; import org.redkale.net.client.ClientCodec; import static org.redkale.net.http.HttpRequest.*; @@ -27,14 +29,6 @@ class HttpSimpleCodec extends ClientCodec { super(connection); } - protected HttpSimpleResult pollResult(HttpSimpleRequest request) { - return new HttpSimpleResult(); - } - - protected void offerResult(HttpSimpleResult rs) { - //do nothing - } - private ByteArray pollArray(ByteArray array) { if (recyclableArray == null) { recyclableArray = new ByteArray(); @@ -49,11 +43,12 @@ class HttpSimpleCodec extends ClientCodec { @Override public void decodeMessages(final ByteBuffer realBuf, final ByteArray array) { int rs; - HttpSimpleResult result = this.lastResult; final ByteBuffer buffer = realBuf; while (buffer.hasRemaining()) { + HttpSimpleResult result = this.lastResult; if (result == null) { result = new HttpSimpleResult(); + result.readState = READ_STATE_ROUTE; this.lastResult = result; } array.clear(); @@ -67,18 +62,18 @@ class HttpSimpleCodec extends ClientCodec { this.halfBytes = pollArray(array); return; } else if (rs < 0) { //数据异常 - occurError(null, new HttpException("http data not valid")); + occurError(null, new HttpException("http status not valid")); return; } result.readState = READ_STATE_HEADER; } if (result.readState == READ_STATE_HEADER) { - rs = readHeaderLines(result, buffer, array); + rs = readHeaderBytes(result, buffer, array); if (rs > 0) { //数据不全 this.halfBytes = pollArray(array); return; } else if (rs < 0) { //数据异常 - occurError(null, new HttpException("http data not valid")); + occurError(null, new HttpException("http header not valid")); return; } result.readState = READ_STATE_BODY; @@ -134,107 +129,51 @@ class HttpSimpleCodec extends ClientCodec { //解析Header Connection: keep-alive //返回0表示解析完整,非0表示还需继续读数据 - private int readHeaderLines(final HttpSimpleResult result, final ByteBuffer buffer, final ByteArray array) { - int remain = buffer.remaining(); - for (;;) { - array.clear(); - if (remain-- < 2) { - if (remain == 1) { - array.put(buffer.get()); - return 1; + private int readHeaderBytes(final HttpSimpleResult result, final ByteBuffer buffer, final ByteArray array) { + byte b; + while (buffer.hasRemaining()) { + b = buffer.get(); + if (b == '\n') { + int len = array.length(); + if (len >= 3 && array.get(len - 1) == '\r' && array.get(len - 2) == '\n' && array.get(len - 3) == '\r') { + //最后一个\r\n不写入 + readHeaderLines(result, array.removeLastByte()); //移除最后一个\r + array.clear(); + return 0; } - return 1; - } - remain--; - byte b1 = buffer.get(); - byte b2 = buffer.get(); - if (b1 == '\r' && b2 == '\n') { - return 0; - } - boolean latin1 = true; - if (latin1 && (b1 < 0x20 || b1 >= 0x80)) { - latin1 = false; - } - if (latin1 && (b2 < 0x20 || b2 >= 0x80)) { - latin1 = false; - } - array.put(b1, b2); - for (;;) { // name - if (remain-- < 1) { - buffer.clear(); - buffer.put(array.content(), 0, array.length()); - return 1; - } - byte b = buffer.get(); - if (b == ':') { - break; - } else if (latin1 && (b < 0x20 || b >= 0x80)) { - latin1 = false; - } - array.put(b); - } - String name = parseHeaderName(latin1, array, null); - array.clear(); - boolean first = true; - int space = 0; - for (;;) { // value - if (remain-- < 1) { - buffer.clear(); - buffer.put(name.getBytes()); - buffer.put((byte) ':'); - if (space == 1) { - buffer.put((byte) ' '); - } else if (space > 0) { - for (int i = 0; i < space; i++) buffer.put((byte) ' '); - } - buffer.put(array.content(), 0, array.length()); - return 1; - } - byte b = buffer.get(); - if (b == '\r') { - if (remain-- < 1) { - buffer.clear(); - buffer.put(name.getBytes()); - buffer.put((byte) ':'); - if (space == 1) { - buffer.put((byte) ' '); - } else if (space > 0) { - for (int i = 0; i < space; i++) buffer.put((byte) ' '); - } - buffer.put(array.content(), 0, array.length()); - buffer.put((byte) '\r'); - return 1; - } - if (buffer.get() != '\n') { - return -1; - } - break; - } - if (first) { - if (b <= ' ') { - space++; - continue; - } - first = false; - } - array.put(b); - } - String value; - switch (name) { - case "Content-Length": - case "content-length": - value = array.toString(true, null); - result.contentLength = Integer.decode(value); - result.header(name, value); - break; - default: - value = array.toString(null); - result.header(name, value); } + array.put(b); } + return 1; } private int readBody(final HttpSimpleResult result, final ByteBuffer buffer, final ByteArray array) { - return 0; + if (result.contentLength >= 0) { + array.put(buffer, Math.min((int) result.contentLength, buffer.remaining())); + int lr = (int) result.contentLength - array.length(); + if (lr == 0) { + result.result(array.getBytes()); + } + return lr > 0 ? lr : 0; + } + return -1; } + + private void readHeaderLines(final HttpSimpleResult result, ByteArray bytes) { + int start = 0; + int posC, posR; + Charset charset = StandardCharsets.UTF_8; + while (start < bytes.length()) { + posC = bytes.indexOf(start, ':'); + String name = bytes.toString(start, posC - start, charset).trim(); + posR = bytes.indexOf(posC + 1, '\r'); + String value = bytes.toString(posC + 1, posR - posC - 1, charset).trim(); + result.header(name, value); + if ("Content-Length".equalsIgnoreCase(name)) { + result.contentLength = Integer.parseInt(value); + } + start = posR + 2; //跳过\r\n + } + } + } diff --git a/src/main/java/org/redkale/net/http/HttpSimpleRequest.java b/src/main/java/org/redkale/net/http/HttpSimpleRequest.java index e544f5a4f..8ad98873e 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleRequest.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleRequest.java @@ -19,6 +19,7 @@ import static org.redkale.net.http.HttpSimpleClient.*; import org.redkale.util.ByteArray; import org.redkale.util.RedkaleException; import org.redkale.util.Traces; +import static org.redkale.util.Utility.isNotEmpty; /** * HttpRequest的缩减版, 只提供部分字段 @@ -110,7 +111,30 @@ public class HttpSimpleRequest extends ClientRequest implements java.io.Serializ @Override public void writeTo(ClientConnection conn, ByteArray array) { - array.put((method.toUpperCase() + " " + requestPath() + " HTTP/1.1\r\n").getBytes(StandardCharsets.UTF_8)); + //组装path和body + String requestPath = requestPath(); + String contentType0 = this.contentType; + byte[] clientBody = null; + if (isNotEmpty(body)) { + String paramstr = getParametersToString(); + if (paramstr != null) { + if (getPath().indexOf('?') > 0) { + requestPath += "&" + paramstr; + } else { + requestPath += "?" + paramstr; + } + } + clientBody = getBody(); + } else { + String paramstr = getParametersToString(); + if (paramstr != null) { + clientBody = paramstr.getBytes(StandardCharsets.UTF_8); + } + contentType0 = "x-www-form-urlencoded"; + } + //写status + array.put((method.toUpperCase() + " " + requestPath + " HTTP/1.1\r\n").getBytes(StandardCharsets.UTF_8)); + //写header if (traceid != null && !containsHeaderIgnoreCase(Rest.REST_HEADER_TRACEID)) { array.put((Rest.REST_HEADER_TRACEID + ": " + traceid + "\r\n").getBytes(StandardCharsets.UTF_8)); } @@ -120,13 +144,15 @@ public class HttpSimpleRequest extends ClientRequest implements java.io.Serializ if (!containsHeaderIgnoreCase("Connection")) { array.put(header_bytes_connalive); } - array.put(contentLengthBytes()); + array.put(("Content-Type: " + contentType0 + "\r\n").getBytes(StandardCharsets.UTF_8)); + array.put(contentLengthBytes(clientBody)); if (headers != null) { headers.forEach((k, v) -> array.put((k + ": " + v + "\r\n").getBytes(StandardCharsets.UTF_8))); } array.put((byte) '\r', (byte) '\n'); - if (body != null) { - array.put(body); + //写body + if (clientBody != null) { + array.put(clientBody); } } @@ -134,8 +160,8 @@ public class HttpSimpleRequest extends ClientRequest implements java.io.Serializ return headers != null && headers.containsIgnoreCase(name); } - protected byte[] contentLengthBytes() { - int len = body == null ? 0 : body.length; + protected byte[] contentLengthBytes(byte[] clientBody) { + int len = clientBody == null ? 0 : clientBody.length; if (len < contentLengthArray.length) { return contentLengthArray[len]; } diff --git a/src/main/java/org/redkale/net/http/HttpSimpleResult.java b/src/main/java/org/redkale/net/http/HttpSimpleResult.java index 21f632bb0..fb71dc365 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleResult.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleResult.java @@ -3,8 +3,9 @@ */ package org.redkale.net.http; +import org.redkale.convert.ConvertDisabled; +import org.redkale.convert.json.JsonConvert; import org.redkale.net.client.ClientResult; -import static org.redkale.net.http.HttpSimpleClient.ClientReadCompletionHandler.READ_STATE_ROUTE; /** * @@ -16,19 +17,20 @@ import static org.redkale.net.http.HttpSimpleClient.ClientReadCompletionHandler. * * @since 2.8.0 */ -class HttpSimpleResult extends HttpResult implements ClientResult { +public class HttpSimpleResult extends HttpResult implements ClientResult { - int readState = READ_STATE_ROUTE; + int readState; int contentLength = -1; - byte[] headerBytes; - - boolean headerParsed = false; - @Override + @ConvertDisabled public boolean isKeepAlive() { return true; } + @Override + public String toString() { + return JsonConvert.root().convertTo(HttpResult.class, this); + } } diff --git a/src/main/java/org/redkale/net/http/MultiContext.java b/src/main/java/org/redkale/net/http/MultiContext.java index e0f9a96fe..7f0ea87c2 100644 --- a/src/main/java/org/redkale/net/http/MultiContext.java +++ b/src/main/java/org/redkale/net/http/MultiContext.java @@ -91,7 +91,7 @@ public final class MultiContext { /** * 获取第一个文件的二进制 * - * @param max 可接收的文件大小最大值 + * @param max 可接收的文件大小最大值 * @param fileNameRegx 可接收的文件名正则表达式 * @param contentTypeRegx 可接收的ContentType正则表达式 * @@ -139,8 +139,8 @@ public final class MultiContext { /** * 获取第一个文件 * - * @param home 进程目录 - * @param max 可接收的文件大小最大值 + * @param home 进程目录 + * @param max 可接收的文件大小最大值 * @param fileNameRegx 可接收的文件名正则表达式 * @param contentTypeRegx 可接收的ContentType正则表达式 * @@ -172,7 +172,7 @@ public final class MultiContext { boolean rs = part.save(max < 1 ? Long.MAX_VALUE : max, file); if (!rs) { Files.delete(file.toPath()); - parent.delete(); + Files.delete(parent.toPath()); } else { tmpfile = file; } @@ -184,8 +184,8 @@ public final class MultiContext { /** * 获取所有文件 * - * @param home 进程目录 - * @param max 可接收的文件大小最大值 + * @param home 进程目录 + * @param max 可接收的文件大小最大值 * @param fileNameRegx 可接收的文件名正则表达式 * @param contentTypeRegx 可接收的ContentType正则表达式 * @@ -211,8 +211,8 @@ public final class MultiContext { } boolean rs = part.save(max < 1 ? Long.MAX_VALUE : max, file); if (!rs) { - file.delete(); - parent.delete(); + Files.delete(file.toPath()); + Files.delete(parent.toPath()); continue; } if (files == null) { diff --git a/src/main/java/org/redkale/net/http/WebSocketNode.java b/src/main/java/org/redkale/net/http/WebSocketNode.java index d760ccc04..1037524ea 100644 --- a/src/main/java/org/redkale/net/http/WebSocketNode.java +++ b/src/main/java/org/redkale/net/http/WebSocketNode.java @@ -95,7 +95,7 @@ public abstract class WebSocketNode implements Service { if (source != null && this.wsNodeAddress == null) { //非分布式模式 this.wsNodeAddress = new WebSocketAddress(mqtopic, new InetSocketAddress("127.0.0.1", 27)); } - if (source != null && wsNodeAddress != null) { + if (source != null) { source.sadd(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress); } } diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java index 78b26d6a8..c98f169b1 100644 --- a/src/main/java/org/redkale/util/Utility.java +++ b/src/main/java/org/redkale/util/Utility.java @@ -456,8 +456,8 @@ public final class Utility { public static void sleep(long millis) { try { Thread.sleep(millis); - } catch (Exception e) { - //do nothing + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } } diff --git a/src/test/java/org/redkale/test/http/HttpSimpleClientTest.java b/src/test/java/org/redkale/test/http/HttpSimpleClientTest.java new file mode 100644 index 000000000..35e8debb0 --- /dev/null +++ b/src/test/java/org/redkale/test/http/HttpSimpleClientTest.java @@ -0,0 +1,117 @@ +/* + * + */ +package org.redkale.test.http; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.redkale.boot.Application; +import org.redkale.net.AsyncIOGroup; +import org.redkale.net.http.HttpServer; +import org.redkale.net.http.HttpSimpleClient; +import org.redkale.net.http.HttpSimpleRequest; +import org.redkale.util.AnyValue; +import org.redkale.util.ResourceFactory; + +/** + * + * @author zhangjx + */ +public class HttpSimpleClientTest { + + private static int port = 0; + + private static Application application; + + private static ResourceFactory factory; + + private static HttpServer server; + + private boolean main; + + public static void main(String[] args) throws Throwable { + HttpSimpleClientTest test = new HttpSimpleClientTest(); + test.main = true; + test.run(); + } + + @Test + public void run() throws Exception { + runServer(); + //Utility.sleep(50000); + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + asyncGroup.start(); + HttpSimpleClient client = HttpSimpleClient.create(asyncGroup); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", port); + { + HttpSimpleRequest req = HttpSimpleRequest.createPath("/test").param("id", 100); + System.out.println(client.getAsync("http://127.0.0.1:" + port + req.getPath() + "?id=100").join()); + System.out.println(client.sendAsync(addr, req).join()); + } + final int count = 10; + { + final CountDownLatch cdl = new CountDownLatch(count); + for (int i = 100; i < 100 + count; i++) { + final int index = i; + HttpSimpleRequest req = HttpSimpleRequest.createPath("/test").param("id", index); + client.getAsync("http://127.0.0.1:" + port + req.getPath() + "?id=" + index).whenComplete((v, t) -> { + cdl.countDown(); + Assertions.assertEquals("ok-" + index, new String((byte[]) v.getResult())); + }); + } + cdl.await(); + System.out.println("结束并发1"); + } + { + final CountDownLatch cdl = new CountDownLatch(count); + for (int i = 100; i < 100 + count; i++) { + final int index = i; + HttpSimpleRequest req = HttpSimpleRequest.createPath("/test").param("id", index); + client.sendAsync(addr, req).whenComplete((v, t) -> { + cdl.countDown(); + System.out.println("输出: " + new String((byte[]) v.getResult())); + Assertions.assertEquals("ok-" + index, new String((byte[]) v.getResult())); + }); + } + cdl.await(); + System.out.println("结束并发2"); + } + server.shutdown(); + } + + private static void runServer() throws Exception { + application = Application.create(true); + factory = application.getResourceFactory(); + factory.register("", Application.class, application); + final CountDownLatch cdl = new CountDownLatch(1); + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + asyncGroup.start(); + new Thread() { + { + setName("Thread-Server-01"); + } + + @Override + public void run() { + try { + AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue(); + conf.addValue("host", "0.0.0.0"); + conf.addValue("port", "" + port); + conf.addValue("protocol", "HTTP"); + conf.addValue("maxbody", "" + (100 * 1024 * 1024)); + server = new HttpServer(factory); + server.init(conf); + server.addHttpServlet(new HttpSimpleServlet(), "/test"); + server.start(); + port = server.getSocketAddress().getPort(); + cdl.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + cdl.await(); + } +} diff --git a/src/test/java/org/redkale/test/http/HttpSimpleServlet.java b/src/test/java/org/redkale/test/http/HttpSimpleServlet.java new file mode 100644 index 000000000..214fd2397 --- /dev/null +++ b/src/test/java/org/redkale/test/http/HttpSimpleServlet.java @@ -0,0 +1,23 @@ +/* + * + */ +package org.redkale.test.http; + +import java.io.IOException; +import org.redkale.net.http.HttpMapping; +import org.redkale.net.http.HttpRequest; +import org.redkale.net.http.HttpResponse; +import org.redkale.net.http.HttpServlet; + +/** + * + * @author zhangjx + */ +public class HttpSimpleServlet extends HttpServlet { + + @HttpMapping(url = "/test") + public void test(HttpRequest req, HttpResponse resp) throws IOException { + System.out.println("运行到test方法了, id=" + req.getParameter("id")); + resp.finish("ok-" + req.getParameter("id", "0")); + } +} diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index 69f727df8..2511f000b 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -100,7 +100,7 @@ public class SncpTest { callbean = service.insert(callbean); System.out.println("bean: " + callbean); System.out.println("\r\n\r\n\r\n\r\n---------------------------------------------------"); - Thread.sleep(200); + Utility.sleep(200); final int count = main ? 40 : 11; final CountDownLatch cld = new CountDownLatch(count); final AtomicInteger ai = new AtomicInteger(); @@ -142,7 +142,7 @@ public class SncpTest { } return; } - Thread.sleep(200); + Utility.sleep(200); final CountDownLatch cld2 = new CountDownLatch(1); long s2 = System.currentTimeMillis(); final CompletableFuture future = service.queryResultAsync(callbean);