diff --git a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java index 484748c5b..a06b32e83 100644 --- a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java @@ -85,7 +85,7 @@ public class HttpClusterRpcClient extends HttpRpcClient { return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { if (addrs == null || addrs.isEmpty()) { if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + ": module=" + localModule + ", resname=" + resname + ", addrmap is empty"); + logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + " failed, module=" + localModule + ", resname=" + resname + ", address is empty"); } return new HttpResult().status(404).toFuture(); } diff --git a/src/main/java/org/redkale/net/http/HttpSimpleClient.java b/src/main/java/org/redkale/net/http/HttpSimpleClient.java index e64967e49..3fa550b85 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleClient.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleClient.java @@ -5,12 +5,15 @@ */ package org.redkale.net.http; +import java.lang.reflect.Type; import java.net.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.util.*; import java.util.concurrent.*; +import org.redkale.convert.Convert; +import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; import static org.redkale.net.http.HttpRequest.parseHeaderName; import org.redkale.util.*; @@ -51,55 +54,153 @@ public class HttpSimpleClient { return new HttpSimpleClient(asyncGroup); } - public CompletableFuture> getAsync(String url) { - return sendAsync("GET", url, null, (byte[]) null); + public HttpSimpleClient readTimeoutSeconds(int readTimeoutSeconds) { + this.readTimeoutSeconds = readTimeoutSeconds; + return this; } - public CompletableFuture> postAsync(String url) { - return sendAsync("POST", url, null, (byte[]) null); + public HttpSimpleClient writeTimeoutSeconds(int writeTimeoutSeconds) { + this.writeTimeoutSeconds = writeTimeoutSeconds; + return this; + } + + public int getReadTimeoutSeconds() { + return readTimeoutSeconds; + } + + public void setReadTimeoutSeconds(int readTimeoutSeconds) { + this.readTimeoutSeconds = readTimeoutSeconds; + } + + public int getWriteTimeoutSeconds() { + return writeTimeoutSeconds; + } + + public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { + this.writeTimeoutSeconds = writeTimeoutSeconds; + } + + public CompletableFuture> getAsync(String url) { + return sendAsync("GET", url, null, (byte[]) null); } public CompletableFuture> getAsync(String url, String body) { return sendAsync("GET", url, null, body == null ? null : body.getBytes(StandardCharsets.UTF_8)); } - public CompletableFuture> postAsync(String url, String body) { - return sendAsync("POST", url, null, body == null ? null : body.getBytes(StandardCharsets.UTF_8)); + public CompletableFuture> getAsync(String url, Type valueType) { + return sendAsync("GET", url, null, (byte[]) null, (Convert) null, valueType); + } + + public CompletableFuture> getAsync(String url, String body, Type valueType) { + return sendAsync("GET", url, null, body == null ? null : body.getBytes(StandardCharsets.UTF_8), (Convert) null, valueType); + } + + public CompletableFuture> getAsync(String url, Convert convert, Type valueType) { + return sendAsync("GET", url, null, (byte[]) null, convert, valueType); + } + + public CompletableFuture> getAsync(String url, String body, Convert convert, Type valueType) { + return sendAsync("GET", url, null, body == null ? null : body.getBytes(StandardCharsets.UTF_8), convert, valueType); } public CompletableFuture> getAsync(String url, byte[] body) { return sendAsync("GET", url, null, body); } - public CompletableFuture> postAsync(String url, byte[] body) { - return sendAsync("POST", url, null, body); + public CompletableFuture> getAsync(String url, byte[] body, Type valueType) { + return sendAsync("GET", url, null, body, (Convert) null, valueType); + } + + public CompletableFuture> getAsync(String url, byte[] body, Convert convert, Type valueType) { + return sendAsync("GET", url, null, body, convert, valueType); } public CompletableFuture> getAsync(String url, Map headers) { return sendAsync("GET", url, headers, (byte[]) null); } - public CompletableFuture> postAsync(String url, Map headers) { - return sendAsync("POST", url, headers, (byte[]) null); + public CompletableFuture> getAsync(String url, Map headers, Type valueType) { + return sendAsync("GET", url, headers, null, (Convert) null, valueType); + } + + public CompletableFuture> getAsync(String url, Map headers, Convert convert, Type valueType) { + return sendAsync("GET", url, headers, null, convert, valueType); } public CompletableFuture> getAsync(String url, Map headers, String body) { return sendAsync("GET", url, headers, body == null ? null : body.getBytes(StandardCharsets.UTF_8)); } - public CompletableFuture> postAsync(String url, Map headers, String body) { - return sendAsync("POST", url, headers, body == null ? null : body.getBytes(StandardCharsets.UTF_8)); - } - public CompletableFuture> getAsync(String url, Map headers, byte[] body) { return sendAsync("GET", url, headers, body); } + public CompletableFuture> postAsync(String url) { + return sendAsync("POST", url, null, (byte[]) null); + } + + public CompletableFuture> postAsync(String url, String body) { + return sendAsync("POST", url, null, body == null ? null : body.getBytes(StandardCharsets.UTF_8)); + } + + public CompletableFuture> postAsync(String url, Type valueType) { + return sendAsync("POST", url, null, (byte[]) null, (Convert) null, valueType); + } + + public CompletableFuture> postAsync(String url, String body, Type valueType) { + return sendAsync("POST", url, null, body == null ? null : body.getBytes(StandardCharsets.UTF_8), (Convert) null, valueType); + } + + public CompletableFuture> postAsync(String url, Convert convert, Type valueType) { + return sendAsync("POST", url, null, (byte[]) null, convert, valueType); + } + + public CompletableFuture> postAsync(String url, String body, Convert convert, Type valueType) { + return sendAsync("POST", url, null, body == null ? null : body.getBytes(StandardCharsets.UTF_8), convert, valueType); + } + + public CompletableFuture> postAsync(String url, byte[] body) { + return sendAsync("POST", url, null, body); + } + + public CompletableFuture> postAsync(String url, byte[] body, Type valueType) { + return sendAsync("POST", url, null, body, (Convert) null, valueType); + } + + public CompletableFuture> postAsync(String url, byte[] body, Convert convert, Type valueType) { + return sendAsync("POST", url, null, body, convert, valueType); + } + + public CompletableFuture> postAsync(String url, Map headers) { + return sendAsync("POST", url, headers, (byte[]) null); + } + + public CompletableFuture> postAsync(String url, Map headers, Type valueType) { + return sendAsync("POST", url, headers, null, (Convert) null, valueType); + } + + public CompletableFuture> postAsync(String url, Map headers, Convert convert, Type valueType) { + return sendAsync("POST", url, headers, null, convert, valueType); + } + + public CompletableFuture> postAsync(String url, Map headers, String body) { + return sendAsync("POST", url, headers, body == null ? null : body.getBytes(StandardCharsets.UTF_8)); + } + public CompletableFuture> postAsync(String url, Map headers, byte[] body) { return sendAsync("POST", url, headers, body); } public CompletableFuture> sendAsync(String method, String url, Map headers, byte[] body) { + return sendAsync(method, url, headers, body, (Convert) null, null); + } + + public CompletableFuture> sendAsync(String method, String url, Map headers, byte[] body, Type valueType) { + return sendAsync(method, url, headers, body, (Convert) null, valueType); + } + + public CompletableFuture> sendAsync(String method, String url, Map headers, byte[] body, Convert convert, Type valueType) { final URI uri = URI.create(url); final SocketAddress address = new InetSocketAddress(uri.getHost(), uri.getPort() > 0 ? uri.getPort() : (url.startsWith("https:") ? 443 : 80)); return asyncGroup.createTCPClient(address, readTimeoutSeconds, writeTimeoutSeconds).thenCompose(conn -> { @@ -123,11 +224,11 @@ public class HttpSimpleClient { if (body != null) { array.put(body); } - final CompletableFuture> future = new CompletableFuture(); + final CompletableFuture> future = new CompletableFuture(); conn.write(array, new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { - conn.read(new ClientReadCompletionHandler(conn, array.clear(), future)); + conn.read(new ClientReadCompletionHandler(conn, array.clear(), convert, valueType, future)); } @Override @@ -148,7 +249,7 @@ public class HttpSimpleClient { // System.out.println(client.getAsync(url).join()); // } - protected static class ClientReadCompletionHandler implements CompletionHandler { + protected static class ClientReadCompletionHandler implements CompletionHandler { protected static final int READ_STATE_ROUTE = 1; @@ -162,7 +263,11 @@ public class HttpSimpleClient { protected final ByteArray array; - protected final CompletableFuture> future; + protected final CompletableFuture> future; + + protected Convert convert; + + protected Type valueType; protected HttpResult responseResult; @@ -170,9 +275,11 @@ public class HttpSimpleClient { protected int contentLength = -1; - public ClientReadCompletionHandler(AsyncConnection conn, ByteArray array, CompletableFuture> future) { + public ClientReadCompletionHandler(AsyncConnection conn, ByteArray array, Convert convert, Type valueType, CompletableFuture> future) { this.conn = conn; this.array = array; + this.convert = convert; + this.valueType = valueType; this.future = future; } @@ -223,9 +330,21 @@ public class HttpSimpleClient { if (responseResult.getStatus() <= 200) { this.responseResult.setResult(array.getBytes()); } - this.future.complete(this.responseResult); conn.offerReadBuffer(buffer); conn.dispose(); + + if (this.responseResult != null && valueType != null) { + Convert c = convert == null ? JsonConvert.root() : convert; + HttpResult result = this.responseResult; + try { + result.result(c.convertFrom(valueType, this.responseResult.getResult())); + this.future.complete((HttpResult) this.responseResult); + } catch (Exception e) { + this.future.completeExceptionally(e); + } + } else { + this.future.complete((HttpResult) this.responseResult); + } } //解析 HTTP/1.1 200 OK