HttpSimpleClient优化

This commit is contained in:
redkale
2023-11-27 19:21:58 +08:00
parent 90e813a06c
commit 252a45fbb4
13 changed files with 249 additions and 142 deletions

View File

@@ -153,7 +153,7 @@ public class HttpClusterRpcClient extends HttpRpcClient {
return new HttpResult<byte[]>().status(404).toFuture();
}
InetSocketAddress addr = it.next();
String host = addr.getPort() != 80 ? addr.getHostString() : (addr.getHostString() + ":" + addr.getPort());
String host = addr.getPort() > 0 && addr.getPort() != 80 ? (addr.getHostString() + ":" + addr.getPort()) : addr.getHostString();
String url = "http://" + host + requestPath;
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "sendEachAddressAsync: url: " + url

View File

@@ -344,15 +344,13 @@ public class HttpResourceServlet extends HttpServlet {
if (this.servlet.cachedLength.longValue() + length > this.servlet.cachelimit) {
return; //超过缓存总容量
}
try {
FileInputStream in = new FileInputStream(file);
try (FileInputStream in = new FileInputStream(file)) {
ByteArray out = new ByteArray((int) file.length());
byte[] bytes = new byte[10240];
int pos;
while ((pos = in.read(bytes)) != -1) {
out.put(bytes, 0, pos);
}
in.close();
this.content = out;
this.servlet.cachedLength.add(this.content.length());
} catch (Exception e) {

View File

@@ -41,8 +41,6 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private ByteBufferPool safeBufferPool;
private final ReentrantLock groupLock = new ReentrantLock();
private final ReentrantLock addLock = new ReentrantLock();
//配置<executor threads="0"> APP_EXECUTOR资源为null

View File

@@ -271,7 +271,7 @@ public class HttpSimpleClient extends Client<HttpSimpleConnection, HttpSimpleReq
conn.write(array, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
conn.read(new ClientReadCompletionHandler(conn, workThread, traceid, array.clear(), convert, valueType, future));
conn.readInIOThread(new ClientReadCompletionHandler(conn, workThread, traceid, array.clear(), convert, valueType, future));
}
@Override
@@ -306,7 +306,7 @@ public class HttpSimpleClient extends Client<HttpSimpleConnection, HttpSimpleReq
// asyncGroup.start();
// String url = "http://redkale.org";
// HttpSimpleClient client = HttpSimpleClient.createPath(asyncGroup);
// System.out.println(client.getAsync(url).join());
// (System.out).println(client.getAsync(url).join());
// }
//
protected static class HttpConnection {
@@ -333,6 +333,10 @@ public class HttpSimpleClient extends Client<HttpSimpleConnection, HttpSimpleReq
this.channel.read(handler);
}
public void readInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
this.channel.readInIOThread(handler);
}
public void write(ByteTuple array, CompletionHandler<Integer, Void> handler) {
this.channel.write(array, handler);
}

View File

@@ -4,6 +4,8 @@
package org.redkale.net.http;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.logging.Logger;
import org.redkale.net.client.ClientCodec;
import static org.redkale.net.http.HttpRequest.*;
@@ -27,14 +29,6 @@ class HttpSimpleCodec extends ClientCodec<HttpSimpleRequest, HttpSimpleResult> {
super(connection);
}
protected HttpSimpleResult pollResult(HttpSimpleRequest request) {
return new HttpSimpleResult();
}
protected void offerResult(HttpSimpleResult rs) {
//do nothing
}
private ByteArray pollArray(ByteArray array) {
if (recyclableArray == null) {
recyclableArray = new ByteArray();
@@ -49,11 +43,12 @@ class HttpSimpleCodec extends ClientCodec<HttpSimpleRequest, HttpSimpleResult> {
@Override
public void decodeMessages(final ByteBuffer realBuf, final ByteArray array) {
int rs;
HttpSimpleResult result = this.lastResult;
final ByteBuffer buffer = realBuf;
while (buffer.hasRemaining()) {
HttpSimpleResult result = this.lastResult;
if (result == null) {
result = new HttpSimpleResult();
result.readState = READ_STATE_ROUTE;
this.lastResult = result;
}
array.clear();
@@ -67,18 +62,18 @@ class HttpSimpleCodec extends ClientCodec<HttpSimpleRequest, HttpSimpleResult> {
this.halfBytes = pollArray(array);
return;
} else if (rs < 0) { //数据异常
occurError(null, new HttpException("http data not valid"));
occurError(null, new HttpException("http status not valid"));
return;
}
result.readState = READ_STATE_HEADER;
}
if (result.readState == READ_STATE_HEADER) {
rs = readHeaderLines(result, buffer, array);
rs = readHeaderBytes(result, buffer, array);
if (rs > 0) { //数据不全
this.halfBytes = pollArray(array);
return;
} else if (rs < 0) { //数据异常
occurError(null, new HttpException("http data not valid"));
occurError(null, new HttpException("http header not valid"));
return;
}
result.readState = READ_STATE_BODY;
@@ -134,107 +129,51 @@ class HttpSimpleCodec extends ClientCodec<HttpSimpleRequest, HttpSimpleResult> {
//解析Header Connection: keep-alive
//返回0表示解析完整非0表示还需继续读数据
private int readHeaderLines(final HttpSimpleResult result, final ByteBuffer buffer, final ByteArray array) {
int remain = buffer.remaining();
for (;;) {
array.clear();
if (remain-- < 2) {
if (remain == 1) {
array.put(buffer.get());
return 1;
private int readHeaderBytes(final HttpSimpleResult result, final ByteBuffer buffer, final ByteArray array) {
byte b;
while (buffer.hasRemaining()) {
b = buffer.get();
if (b == '\n') {
int len = array.length();
if (len >= 3 && array.get(len - 1) == '\r' && array.get(len - 2) == '\n' && array.get(len - 3) == '\r') {
//最后一个\r\n不写入
readHeaderLines(result, array.removeLastByte()); //移除最后一个\r
array.clear();
return 0;
}
return 1;
}
remain--;
byte b1 = buffer.get();
byte b2 = buffer.get();
if (b1 == '\r' && b2 == '\n') {
return 0;
}
boolean latin1 = true;
if (latin1 && (b1 < 0x20 || b1 >= 0x80)) {
latin1 = false;
}
if (latin1 && (b2 < 0x20 || b2 >= 0x80)) {
latin1 = false;
}
array.put(b1, b2);
for (;;) { // name
if (remain-- < 1) {
buffer.clear();
buffer.put(array.content(), 0, array.length());
return 1;
}
byte b = buffer.get();
if (b == ':') {
break;
} else if (latin1 && (b < 0x20 || b >= 0x80)) {
latin1 = false;
}
array.put(b);
}
String name = parseHeaderName(latin1, array, null);
array.clear();
boolean first = true;
int space = 0;
for (;;) { // value
if (remain-- < 1) {
buffer.clear();
buffer.put(name.getBytes());
buffer.put((byte) ':');
if (space == 1) {
buffer.put((byte) ' ');
} else if (space > 0) {
for (int i = 0; i < space; i++) buffer.put((byte) ' ');
}
buffer.put(array.content(), 0, array.length());
return 1;
}
byte b = buffer.get();
if (b == '\r') {
if (remain-- < 1) {
buffer.clear();
buffer.put(name.getBytes());
buffer.put((byte) ':');
if (space == 1) {
buffer.put((byte) ' ');
} else if (space > 0) {
for (int i = 0; i < space; i++) buffer.put((byte) ' ');
}
buffer.put(array.content(), 0, array.length());
buffer.put((byte) '\r');
return 1;
}
if (buffer.get() != '\n') {
return -1;
}
break;
}
if (first) {
if (b <= ' ') {
space++;
continue;
}
first = false;
}
array.put(b);
}
String value;
switch (name) {
case "Content-Length":
case "content-length":
value = array.toString(true, null);
result.contentLength = Integer.decode(value);
result.header(name, value);
break;
default:
value = array.toString(null);
result.header(name, value);
}
array.put(b);
}
return 1;
}
private int readBody(final HttpSimpleResult result, final ByteBuffer buffer, final ByteArray array) {
return 0;
if (result.contentLength >= 0) {
array.put(buffer, Math.min((int) result.contentLength, buffer.remaining()));
int lr = (int) result.contentLength - array.length();
if (lr == 0) {
result.result(array.getBytes());
}
return lr > 0 ? lr : 0;
}
return -1;
}
private void readHeaderLines(final HttpSimpleResult result, ByteArray bytes) {
int start = 0;
int posC, posR;
Charset charset = StandardCharsets.UTF_8;
while (start < bytes.length()) {
posC = bytes.indexOf(start, ':');
String name = bytes.toString(start, posC - start, charset).trim();
posR = bytes.indexOf(posC + 1, '\r');
String value = bytes.toString(posC + 1, posR - posC - 1, charset).trim();
result.header(name, value);
if ("Content-Length".equalsIgnoreCase(name)) {
result.contentLength = Integer.parseInt(value);
}
start = posR + 2; //跳过\r\n
}
}
}

View File

@@ -19,6 +19,7 @@ import static org.redkale.net.http.HttpSimpleClient.*;
import org.redkale.util.ByteArray;
import org.redkale.util.RedkaleException;
import org.redkale.util.Traces;
import static org.redkale.util.Utility.isNotEmpty;
/**
* HttpRequest的缩减版, 只提供部分字段
@@ -110,7 +111,30 @@ public class HttpSimpleRequest extends ClientRequest implements java.io.Serializ
@Override
public void writeTo(ClientConnection conn, ByteArray array) {
array.put((method.toUpperCase() + " " + requestPath() + " HTTP/1.1\r\n").getBytes(StandardCharsets.UTF_8));
//组装path和body
String requestPath = requestPath();
String contentType0 = this.contentType;
byte[] clientBody = null;
if (isNotEmpty(body)) {
String paramstr = getParametersToString();
if (paramstr != null) {
if (getPath().indexOf('?') > 0) {
requestPath += "&" + paramstr;
} else {
requestPath += "?" + paramstr;
}
}
clientBody = getBody();
} else {
String paramstr = getParametersToString();
if (paramstr != null) {
clientBody = paramstr.getBytes(StandardCharsets.UTF_8);
}
contentType0 = "x-www-form-urlencoded";
}
//写status
array.put((method.toUpperCase() + " " + requestPath + " HTTP/1.1\r\n").getBytes(StandardCharsets.UTF_8));
//写header
if (traceid != null && !containsHeaderIgnoreCase(Rest.REST_HEADER_TRACEID)) {
array.put((Rest.REST_HEADER_TRACEID + ": " + traceid + "\r\n").getBytes(StandardCharsets.UTF_8));
}
@@ -120,13 +144,15 @@ public class HttpSimpleRequest extends ClientRequest implements java.io.Serializ
if (!containsHeaderIgnoreCase("Connection")) {
array.put(header_bytes_connalive);
}
array.put(contentLengthBytes());
array.put(("Content-Type: " + contentType0 + "\r\n").getBytes(StandardCharsets.UTF_8));
array.put(contentLengthBytes(clientBody));
if (headers != null) {
headers.forEach((k, v) -> array.put((k + ": " + v + "\r\n").getBytes(StandardCharsets.UTF_8)));
}
array.put((byte) '\r', (byte) '\n');
if (body != null) {
array.put(body);
//写body
if (clientBody != null) {
array.put(clientBody);
}
}
@@ -134,8 +160,8 @@ public class HttpSimpleRequest extends ClientRequest implements java.io.Serializ
return headers != null && headers.containsIgnoreCase(name);
}
protected byte[] contentLengthBytes() {
int len = body == null ? 0 : body.length;
protected byte[] contentLengthBytes(byte[] clientBody) {
int len = clientBody == null ? 0 : clientBody.length;
if (len < contentLengthArray.length) {
return contentLengthArray[len];
}

View File

@@ -3,8 +3,9 @@
*/
package org.redkale.net.http;
import org.redkale.convert.ConvertDisabled;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.client.ClientResult;
import static org.redkale.net.http.HttpSimpleClient.ClientReadCompletionHandler.READ_STATE_ROUTE;
/**
*
@@ -16,19 +17,20 @@ import static org.redkale.net.http.HttpSimpleClient.ClientReadCompletionHandler.
*
* @since 2.8.0
*/
class HttpSimpleResult<T> extends HttpResult<T> implements ClientResult {
public class HttpSimpleResult<T> extends HttpResult<T> implements ClientResult {
int readState = READ_STATE_ROUTE;
int readState;
int contentLength = -1;
byte[] headerBytes;
boolean headerParsed = false;
@Override
@ConvertDisabled
public boolean isKeepAlive() {
return true;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(HttpResult.class, this);
}
}

View File

@@ -91,7 +91,7 @@ public final class MultiContext {
/**
* 获取第一个文件的二进制
*
* @param max 可接收的文件大小最大值
* @param max 可接收的文件大小最大值
* @param fileNameRegx 可接收的文件名正则表达式
* @param contentTypeRegx 可接收的ContentType正则表达式
*
@@ -139,8 +139,8 @@ public final class MultiContext {
/**
* 获取第一个文件
*
* @param home 进程目录
* @param max 可接收的文件大小最大值
* @param home 进程目录
* @param max 可接收的文件大小最大值
* @param fileNameRegx 可接收的文件名正则表达式
* @param contentTypeRegx 可接收的ContentType正则表达式
*
@@ -172,7 +172,7 @@ public final class MultiContext {
boolean rs = part.save(max < 1 ? Long.MAX_VALUE : max, file);
if (!rs) {
Files.delete(file.toPath());
parent.delete();
Files.delete(parent.toPath());
} else {
tmpfile = file;
}
@@ -184,8 +184,8 @@ public final class MultiContext {
/**
* 获取所有文件
*
* @param home 进程目录
* @param max 可接收的文件大小最大值
* @param home 进程目录
* @param max 可接收的文件大小最大值
* @param fileNameRegx 可接收的文件名正则表达式
* @param contentTypeRegx 可接收的ContentType正则表达式
*
@@ -211,8 +211,8 @@ public final class MultiContext {
}
boolean rs = part.save(max < 1 ? Long.MAX_VALUE : max, file);
if (!rs) {
file.delete();
parent.delete();
Files.delete(file.toPath());
Files.delete(parent.toPath());
continue;
}
if (files == null) {

View File

@@ -95,7 +95,7 @@ public abstract class WebSocketNode implements Service {
if (source != null && this.wsNodeAddress == null) { //非分布式模式
this.wsNodeAddress = new WebSocketAddress(mqtopic, new InetSocketAddress("127.0.0.1", 27));
}
if (source != null && wsNodeAddress != null) {
if (source != null) {
source.sadd(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
}
}

View File

@@ -456,8 +456,8 @@ public final class Utility {
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (Exception e) {
//do nothing
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}

View File

@@ -0,0 +1,117 @@
/*
*
*/
package org.redkale.test.http;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redkale.boot.Application;
import org.redkale.net.AsyncIOGroup;
import org.redkale.net.http.HttpServer;
import org.redkale.net.http.HttpSimpleClient;
import org.redkale.net.http.HttpSimpleRequest;
import org.redkale.util.AnyValue;
import org.redkale.util.ResourceFactory;
/**
*
* @author zhangjx
*/
public class HttpSimpleClientTest {
private static int port = 0;
private static Application application;
private static ResourceFactory factory;
private static HttpServer server;
private boolean main;
public static void main(String[] args) throws Throwable {
HttpSimpleClientTest test = new HttpSimpleClientTest();
test.main = true;
test.run();
}
@Test
public void run() throws Exception {
runServer();
//Utility.sleep(50000);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start();
HttpSimpleClient client = HttpSimpleClient.create(asyncGroup);
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", port);
{
HttpSimpleRequest req = HttpSimpleRequest.createPath("/test").param("id", 100);
System.out.println(client.getAsync("http://127.0.0.1:" + port + req.getPath() + "?id=100").join());
System.out.println(client.sendAsync(addr, req).join());
}
final int count = 10;
{
final CountDownLatch cdl = new CountDownLatch(count);
for (int i = 100; i < 100 + count; i++) {
final int index = i;
HttpSimpleRequest req = HttpSimpleRequest.createPath("/test").param("id", index);
client.getAsync("http://127.0.0.1:" + port + req.getPath() + "?id=" + index).whenComplete((v, t) -> {
cdl.countDown();
Assertions.assertEquals("ok-" + index, new String((byte[]) v.getResult()));
});
}
cdl.await();
System.out.println("结束并发1");
}
{
final CountDownLatch cdl = new CountDownLatch(count);
for (int i = 100; i < 100 + count; i++) {
final int index = i;
HttpSimpleRequest req = HttpSimpleRequest.createPath("/test").param("id", index);
client.sendAsync(addr, req).whenComplete((v, t) -> {
cdl.countDown();
System.out.println("输出: " + new String((byte[]) v.getResult()));
Assertions.assertEquals("ok-" + index, new String((byte[]) v.getResult()));
});
}
cdl.await();
System.out.println("结束并发2");
}
server.shutdown();
}
private static void runServer() throws Exception {
application = Application.create(true);
factory = application.getResourceFactory();
factory.register("", Application.class, application);
final CountDownLatch cdl = new CountDownLatch(1);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start();
new Thread() {
{
setName("Thread-Server-01");
}
@Override
public void run() {
try {
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("host", "0.0.0.0");
conf.addValue("port", "" + port);
conf.addValue("protocol", "HTTP");
conf.addValue("maxbody", "" + (100 * 1024 * 1024));
server = new HttpServer(factory);
server.init(conf);
server.addHttpServlet(new HttpSimpleServlet(), "/test");
server.start();
port = server.getSocketAddress().getPort();
cdl.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
cdl.await();
}
}

View File

@@ -0,0 +1,23 @@
/*
*
*/
package org.redkale.test.http;
import java.io.IOException;
import org.redkale.net.http.HttpMapping;
import org.redkale.net.http.HttpRequest;
import org.redkale.net.http.HttpResponse;
import org.redkale.net.http.HttpServlet;
/**
*
* @author zhangjx
*/
public class HttpSimpleServlet extends HttpServlet {
@HttpMapping(url = "/test")
public void test(HttpRequest req, HttpResponse resp) throws IOException {
System.out.println("运行到test方法了 id=" + req.getParameter("id"));
resp.finish("ok-" + req.getParameter("id", "0"));
}
}

View File

@@ -100,7 +100,7 @@ public class SncpTest {
callbean = service.insert(callbean);
System.out.println("bean " + callbean);
System.out.println("\r\n\r\n\r\n\r\n---------------------------------------------------");
Thread.sleep(200);
Utility.sleep(200);
final int count = main ? 40 : 11;
final CountDownLatch cld = new CountDownLatch(count);
final AtomicInteger ai = new AtomicInteger();
@@ -142,7 +142,7 @@ public class SncpTest {
}
return;
}
Thread.sleep(200);
Utility.sleep(200);
final CountDownLatch cld2 = new CountDownLatch(1);
long s2 = System.currentTimeMillis();
final CompletableFuture<String> future = service.queryResultAsync(callbean);