HttpSimpleRequest优化

This commit is contained in:
redkale
2023-11-22 10:33:20 +08:00
parent 7433530998
commit 0bb91186a8
10 changed files with 157 additions and 56 deletions

View File

@@ -1159,7 +1159,7 @@ public final class Application {
rf.register(resourceName, java.net.http.HttpClient.class, httpClient);
return httpClient;
} catch (Exception e) {
logger.log(Level.SEVERE, "java.net.http.HttpClient inject error", e);
logger.log(Level.SEVERE, java.net.http.HttpClient.class.getSimpleName() + " inject error", e);
return null;
}
}, java.net.http.HttpClient.class);
@@ -1175,7 +1175,7 @@ public final class Application {
rf.register(resourceName, HttpSimpleClient.class, httpClient);
return httpClient;
} catch (Exception e) {
logger.log(Level.SEVERE, "HttpClient inject error", e);
logger.log(Level.SEVERE, HttpSimpleClient.class.getSimpleName() + " inject error", e);
return null;
}
}, HttpSimpleClient.class);

View File

@@ -10,7 +10,6 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.redkale.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.net.WorkThread;
import org.redkale.net.http.*;
import org.redkale.util.Traces;
import org.redkale.util.Utility;
@@ -42,10 +41,10 @@ public class HttpClusterRpcClient extends HttpRpcClient {
protected ClusterAgent clusterAgent;
@Resource(name = "cluster.httpClient", required = false)
protected java.net.http.HttpClient httpClient;
protected HttpSimpleClient httpSimpleClient;
@Resource(name = "cluster.httpClient", required = false)
protected HttpSimpleClient httpSimpleClient;
protected java.net.http.HttpClient httpClient;
public HttpClusterRpcClient(Application application, String resourceName, ClusterAgent clusterAgent) {
Objects.requireNonNull(clusterAgent);
@@ -78,7 +77,6 @@ public class HttpClusterRpcClient extends HttpRpcClient {
private CompletableFuture<HttpResult<byte[]>> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) {
req.setTraceid(Traces.computeIfAbsent(req.getTraceid(), Traces.currentTraceid()));
final WorkThread workThread = WorkThread.currentWorkThread();
String module = req.getPath();
module = module.substring(1); //去掉/
module = module.substring(0, module.indexOf('/'));
@@ -92,7 +90,8 @@ public class HttpClusterRpcClient extends HttpRpcClient {
Traces.currentTraceid(req.getTraceid());
if (isEmpty(addrs)) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + " failed, module=" + localModule + ", resname=" + resname + ", address is empty");
logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage")
+ " failed, module=" + localModule + ", resname=" + resname + ", address is empty");
}
return new HttpResult<byte[]>().status(404).toFuture();
}
@@ -107,24 +106,25 @@ public class HttpClusterRpcClient extends HttpRpcClient {
}
});
}
byte[] clientBody = null;
clientHeaders.set("Content-Type", "x-www-form-urlencoded");
if (req.isRpc()) {
clientHeaders.set(Rest.REST_HEADER_RPC, "true");
}
if (isNotEmpty(req.getTraceid())) {
clientHeaders.set(Rest.REST_HEADER_TRACEID, req.getTraceid());
}
if (userid != null) {
clientHeaders.set(Rest.REST_HEADER_CURRUSERID, String.valueOf(userid));
}
if (req.getReqConvertType() != null) {
clientHeaders.set(Rest.REST_HEADER_REQ_CONVERT, req.getReqConvertType().toString());
}
if (req.getRespConvertType() != null) {
clientHeaders.set(Rest.REST_HEADER_RESP_CONVERT, req.getRespConvertType().toString());
}
if (userid != null) {
clientHeaders.set(Rest.REST_HEADER_CURRUSERID, "" + userid);
}
clientHeaders.set("Content-Type", "x-www-form-urlencoded");
if (req.getBody() != null && req.getBody().length > 0) {
byte[] clientBody = null;
if (isNotEmpty(req.getBody())) {
String paramstr = req.getParametersToString();
if (paramstr != null) {
if (req.getPath().indexOf('?') > 0) {
@@ -141,48 +141,48 @@ public class HttpClusterRpcClient extends HttpRpcClient {
}
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture");
logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter sendEachAddressAsync");
}
return forEachCollectionFuture(logger.isLoggable(Level.FINEST), req, req.requestPath(),
clientHeaders, clientBody, addrs.iterator());
return sendEachAddressAsync(req, req.requestPath(), clientHeaders, clientBody, addrs.iterator());
});
}
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, HttpSimpleRequest req,
private CompletableFuture<HttpResult<byte[]>> sendEachAddressAsync(HttpSimpleRequest req,
String requestPath, final HttpHeaders clientHeaders, byte[] clientBody, Iterator<InetSocketAddress> it) {
if (!it.hasNext()) {
return CompletableFuture.completedFuture(null);
return new HttpResult<byte[]>().status(404).toFuture();
}
InetSocketAddress addr = it.next();
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requestPath;
if (finest) {
if (clientBody != null) {
logger.log(Level.FINEST, "forEachCollectionFuture: url: " + url + ", body: " + new String(clientBody, StandardCharsets.UTF_8) + ", headers: " + clientHeaders);
} else {
logger.log(Level.FINEST, "forEachCollectionFuture: url: " + url + ", headers: " + clientHeaders);
}
String host = addr.getPort() != 80 ? addr.getHostString() : (addr.getHostString() + ":" + addr.getPort());
String url = "http://" + host + requestPath;
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "sendEachAddressAsync: url: " + url
+ ", body: " + (clientBody != null ? new String(clientBody, StandardCharsets.UTF_8) : "") + ", headers: " + clientHeaders);
}
if (httpSimpleClient != null) {
clientHeaders.set("Host", host);
return httpSimpleClient.postAsync(url, clientHeaders, clientBody);
}
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder()
.uri(URI.create(url))
.header(Rest.REST_HEADER_TRACEID, req.getTraceid())
.timeout(Duration.ofMillis(10_000))
//存在sendHeader后不发送body数据的问题 java.net.http.HttpRequest的bug?
.method("POST", clientBody == null ? java.net.http.HttpRequest.BodyPublishers.noBody() : java.net.http.HttpRequest.BodyPublishers.ofByteArray(clientBody));
if (clientHeaders != null) {
} else {
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder()
.uri(URI.create(url))
.timeout(Duration.ofMillis(10_000))
//存在sendHeader后不发送body数据的问题 java.net.http.HttpRequest的bug?
.method("POST", createBodyPublisher(clientBody));
clientHeaders.forEach(builder::header);
return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray())
.thenApply((java.net.http.HttpResponse<byte[]> resp) -> {
Traces.currentTraceid(req.getTraceid());
final int rs = resp.statusCode();
if (rs != 200) {
return new HttpResult<byte[]>().status(rs);
}
return new HttpResult<>(resp.body());
});
}
return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray())
.thenApply((java.net.http.HttpResponse<byte[]> resp) -> {
Traces.currentTraceid(req.getTraceid());
final int rs = resp.statusCode();
if (rs != 200) {
return new HttpResult<byte[]>().status(rs);
}
return new HttpResult<byte[]>(resp.body());
});
}
private static java.net.http.HttpRequest.BodyPublisher createBodyPublisher(byte[] clientBody) {
return clientBody == null ? java.net.http.HttpRequest.BodyPublishers.noBody() : java.net.http.HttpRequest.BodyPublishers.ofByteArray(clientBody);
}
}

View File

@@ -73,7 +73,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
public Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}

View File

@@ -81,7 +81,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
public Set<SocketOption<?>> supportedOptions() {
return udpServerChannel.serverChannel.supportedOptions();
}

View File

@@ -34,7 +34,7 @@ public abstract class ProtocolServer {
public abstract void bind(SocketAddress local, int backlog) throws IOException;
public abstract <T> Set<SocketOption<?>> supportedOptions();
public abstract Set<SocketOption<?>> supportedOptions();
public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException;

View File

@@ -49,6 +49,9 @@ public abstract class Request<C extends Context> {
*/
private final Map<String, Object> properties = new HashMap<>();
/**
* 每次新请求都会清空
*/
protected final Map<String, Object> attributes = new HashMap<>();
protected Request(C context) {

View File

@@ -230,6 +230,9 @@ public class HttpSimpleClient extends Client<HttpSimpleConnection, HttpSimpleReq
public <T> CompletableFuture<HttpResult<T>> sendAsync(String method, String url, HttpHeaders headers, byte[] body, Convert convert, Type valueType) {
final String traceid = Traces.computeIfAbsent(Traces.currentTraceid());
final WorkThread workThread = WorkThread.currentWorkThread();
if (method.indexOf(' ') >= 0 || method.indexOf('\r') >= 0 || method.indexOf('\n') >= 0) {
throw new RedkaleException("http-method(" + method + ") is illegal");
}
if (url.indexOf(' ') >= 0 || url.indexOf('\r') >= 0 || url.indexOf('\n') >= 0) {
throw new RedkaleException("http-url(" + url + ") is illegal");
}
@@ -238,18 +241,21 @@ public class HttpSimpleClient extends Client<HttpSimpleConnection, HttpSimpleReq
final ByteArray array = new ByteArray();
int urlpos = url.indexOf("/", url.indexOf("//") + 3);
array.put((method.toUpperCase() + " " + (urlpos > 0 ? url.substring(urlpos) : "/") + " HTTP/1.1\r\n"
+ "Host: " + uri.getHost() + "\r\n"
+ Rest.REST_HEADER_TRACEID + ": " + traceid + "\r\n"
+ "Content-Length: " + (body == null ? 0 : body.length) + "\r\n").getBytes(StandardCharsets.UTF_8));
array.put((method.toUpperCase() + " " + (urlpos > 0 ? url.substring(urlpos) : "/") + " HTTP/1.1\r\n").getBytes(StandardCharsets.UTF_8));
array.put(("Host: " + uri.getHost() + "\r\n").getBytes(StandardCharsets.UTF_8));
array.put(("Content-Length: " + (body == null ? 0 : body.length) + "\r\n").getBytes(StandardCharsets.UTF_8));
if (headers == null || !headers.contains("User-Agent")) {
array.put(header_bytes_useragent);
}
if (headers == null || !headers.contains("Connection")) {
array.put(header_bytes_connclose);
}
if (headers == null || !headers.contains(Rest.REST_HEADER_TRACEID)) {
array.put((Rest.REST_HEADER_TRACEID + ": " + traceid + "\r\n").getBytes(StandardCharsets.UTF_8));
}
if (headers != null) {
headers.forEach((k, v) -> array.put((k + ": " + String.valueOf(v) + "\r\n").getBytes(StandardCharsets.UTF_8)));
headers.forEach((k, v) -> array.put((k + ": " + v + "\r\n").getBytes(StandardCharsets.UTF_8)));
}
array.put((byte) '\r', (byte) '\n');
if (body != null) {

View File

@@ -10,6 +10,7 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redkale.annotation.Comment;
import org.redkale.annotation.Nullable;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.client.ClientConnection;
@@ -122,6 +123,7 @@ public class HttpSimpleRequest extends ClientRequest implements java.io.Serializ
}
}
@Nullable
@ConvertDisabled
public String getParametersToString() {
if (this.params == null || this.params.isEmpty()) {

View File

@@ -70,12 +70,6 @@ public final class Rest {
private static final String REST_RETURNTYPES_FIELD_NAME = "_redkale_returntypes"; //存在泛型的结果数组
private static final java.lang.reflect.Type TYPE_MAP_STRING_SERIALIZE = new TypeToken<Map<String, Serializable>>() {
}.getType();
private static final java.lang.reflect.Type TYPE_MAP_STRING_STRING = new TypeToken<Map<String, String>>() {
}.getType();
private static final java.lang.reflect.Type TYPE_RETRESULT_STRING = new TypeToken<RetResult<String>>() {
}.getType();

View File

@@ -1002,6 +1002,102 @@ public final class Utility {
return array != null && array.length > 0;
}
/**
* 是否为空
*
* @param array 数组
*
* @return 是否为空
*
*/
public static boolean isEmpty(byte[] array) {
return array == null || array.length == 0;
}
/**
* 是否不为空
*
* @param array 数组
*
* @return 是否不为空
*
*/
public static boolean isNotEmpty(byte[] array) {
return array != null && array.length > 0;
}
/**
* 是否为空
*
* @param array 数组
*
* @return 是否为空
*
*/
public static boolean isEmpty(short[] array) {
return array == null || array.length == 0;
}
/**
* 是否不为空
*
* @param array 数组
*
* @return 是否不为空
*
*/
public static boolean isNotEmpty(short[] array) {
return array != null && array.length > 0;
}
/**
* 是否为空
*
* @param array 数组
*
* @return 是否为空
*
*/
public static boolean isEmpty(int[] array) {
return array == null || array.length == 0;
}
/**
* 是否不为空
*
* @param array 数组
*
* @return 是否不为空
*
*/
public static boolean isNotEmpty(int[] array) {
return array != null && array.length > 0;
}
/**
* 是否为空
*
* @param array 数组
*
* @return 是否为空
*
*/
public static boolean isEmpty(long[] array) {
return array == null || array.length == 0;
}
/**
* 是否不为空
*
* @param array 数组
*
* @return 是否不为空
*
*/
public static boolean isNotEmpty(long[] array) {
return array != null && array.length > 0;
}
/**
* 将字符串首字母大写
*