This commit is contained in:
@@ -9,6 +9,7 @@ import java.lang.reflect.Type;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.logging.Logger;
|
||||||
import org.redkale.convert.ConvertType;
|
import org.redkale.convert.ConvertType;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.net.http.*;
|
import org.redkale.net.http.*;
|
||||||
@@ -25,6 +26,8 @@ import org.redkale.net.http.*;
|
|||||||
*/
|
*/
|
||||||
public class HttpMessageClient extends MessageClient {
|
public class HttpMessageClient extends MessageClient {
|
||||||
|
|
||||||
|
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
|
|
||||||
protected HttpMessageClient(MessageAgent messageAgent) {
|
protected HttpMessageClient(MessageAgent messageAgent) {
|
||||||
super(messageAgent);
|
super(messageAgent);
|
||||||
if (messageAgent != null) { // //RPC方式下无messageAgent
|
if (messageAgent != null) { // //RPC方式下无messageAgent
|
||||||
|
|||||||
@@ -6,10 +6,12 @@
|
|||||||
package org.redkale.mq;
|
package org.redkale.mq;
|
||||||
|
|
||||||
import java.net.*;
|
import java.net.*;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.logging.Level;
|
||||||
import org.redkale.cluster.ClusterAgent;
|
import org.redkale.cluster.ClusterAgent;
|
||||||
import org.redkale.convert.ConvertType;
|
import org.redkale.convert.ConvertType;
|
||||||
import org.redkale.net.http.*;
|
import org.redkale.net.http.*;
|
||||||
@@ -58,6 +60,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<HttpResult<byte[]>> mqtpAsync(int userid, HttpSimpleRequest req) {
|
private CompletableFuture<HttpResult<byte[]>> mqtpAsync(int userid, HttpSimpleRequest req) {
|
||||||
|
final boolean finest = logger.isLoggable(Level.FINEST);
|
||||||
String module = req.getRequestURI();
|
String module = req.getRequestURI();
|
||||||
module = module.substring(1); //去掉/
|
module = module.substring(1); //去掉/
|
||||||
module = module.substring(0, module.indexOf('/'));
|
module = module.substring(0, module.indexOf('/'));
|
||||||
@@ -82,7 +85,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
String suburi = req.getRequestURI();
|
String suburi = req.getRequestURI();
|
||||||
suburi = suburi.substring(1); //跳过 /
|
suburi = suburi.substring(1); //跳过 /
|
||||||
suburi = "/" + realmodule + suburi.substring(suburi.indexOf('/'));
|
suburi = "/" + realmodule + suburi.substring(suburi.indexOf('/'));
|
||||||
futures.add(forEachCollectionFuture(userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + suburi, builder, addrs.iterator()));
|
futures.add(forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + suburi, builder, addrs.iterator()));
|
||||||
}
|
}
|
||||||
if (futures.isEmpty()) return CompletableFuture.completedFuture(null);
|
if (futures.isEmpty()) return CompletableFuture.completedFuture(null);
|
||||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> null);
|
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> null);
|
||||||
@@ -90,6 +93,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<HttpResult<byte[]>> httpAsync(int userid, HttpSimpleRequest req) {
|
private CompletableFuture<HttpResult<byte[]>> httpAsync(int userid, HttpSimpleRequest req) {
|
||||||
|
final boolean finest = logger.isLoggable(Level.FINEST);
|
||||||
String module = req.getRequestURI();
|
String module = req.getRequestURI();
|
||||||
module = module.substring(1); //去掉/
|
module = module.substring(1); //去掉/
|
||||||
module = module.substring(0, module.indexOf('/'));
|
module = module.substring(0, module.indexOf('/'));
|
||||||
@@ -106,16 +110,16 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
builder.header("Content-Type", "x-www-form-urlencoded");
|
builder.header("Content-Type", "x-www-form-urlencoded");
|
||||||
String paramstr = req.getParametersToString();
|
String paramstr = req.getParametersToString();
|
||||||
if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr));
|
if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr));
|
||||||
return forEachCollectionFuture(userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), builder, addrs.iterator());
|
return forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), builder, addrs.iterator());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(int userid, HttpSimpleRequest req, String requesturi, java.net.http.HttpRequest.Builder builder, Iterator<InetSocketAddress> it) {
|
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, int userid, HttpSimpleRequest req, String requesturi, java.net.http.HttpRequest.Builder builder, Iterator<InetSocketAddress> it) {
|
||||||
if (!it.hasNext()) return CompletableFuture.completedFuture(null);
|
if (!it.hasNext()) return CompletableFuture.completedFuture(null);
|
||||||
InetSocketAddress addr = it.next();
|
InetSocketAddress addr = it.next();
|
||||||
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi;
|
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi;
|
||||||
return httpClient.sendAsync(builder.copy().uri(URI.create(url)).build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> {
|
return httpClient.sendAsync(builder.copy().uri(URI.create(url)).build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> {
|
||||||
if (resp.statusCode() != 200) return forEachCollectionFuture(userid, req, requesturi, builder, it);
|
if (resp.statusCode() != 200) return forEachCollectionFuture(finest, userid, req, requesturi, builder, it);
|
||||||
HttpResult rs = new HttpResult();
|
HttpResult rs = new HttpResult();
|
||||||
java.net.http.HttpHeaders hs = resp.headers();
|
java.net.http.HttpHeaders hs = resp.headers();
|
||||||
if (hs != null) {
|
if (hs != null) {
|
||||||
@@ -130,6 +134,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
rs.setResult(resp.body());
|
rs.setResult(resp.body());
|
||||||
|
if (finest) logger.log(Level.FINEST, url + ", result = " + new String(resp.body(), StandardCharsets.UTF_8));
|
||||||
return CompletableFuture.completedFuture(rs);
|
return CompletableFuture.completedFuture(rs);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user