This commit is contained in:
@@ -219,9 +219,8 @@ public abstract class ClusterAgent {
|
|||||||
}
|
}
|
||||||
if ("mqtp".equalsIgnoreCase(protocol)) {
|
if ("mqtp".equalsIgnoreCase(protocol)) {
|
||||||
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
|
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
|
||||||
String resname = Sncp.getResourceName(service);
|
|
||||||
String selfmodule = Rest.getRestModule(service).toLowerCase();
|
String selfmodule = Rest.getRestModule(service).toLowerCase();
|
||||||
return protocol.toLowerCase() + ":" + mmc.module() + ":" + selfmodule + (resname.isEmpty() ? "" : ("-" + resname));
|
return protocol.toLowerCase() + ":" + mmc.module() + ":" + selfmodule;
|
||||||
}
|
}
|
||||||
if (!Sncp.isSncpDyn(service)) return protocol.toLowerCase() + ":" + service.getClass().getName();
|
if (!Sncp.isSncpDyn(service)) return protocol.toLowerCase() + ":" + service.getClass().getName();
|
||||||
String resname = Sncp.getResourceName(service);
|
String resname = Sncp.getResourceName(service);
|
||||||
|
|||||||
@@ -44,26 +44,58 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
||||||
return httpAsync("http", userid, request);
|
return httpAsync(userid, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
||||||
httpAsync("http", userid, request);
|
httpAsync(userid, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
||||||
httpAsync("mqtp", userid, request);
|
mqtpAsync(userid, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<HttpResult<byte[]>> httpAsync(String protocol, int userid, HttpSimpleRequest req) {
|
private CompletableFuture<HttpResult<byte[]>> mqtpAsync(int userid, HttpSimpleRequest req) {
|
||||||
String module = req.getRequestURI();
|
String module = req.getRequestURI();
|
||||||
module = module.substring(1); //去掉/
|
module = module.substring(1); //去掉/
|
||||||
module = module.substring(0, module.indexOf('/'));
|
module = module.substring(0, module.indexOf('/'));
|
||||||
Map<String, String> headers = req.getHeaders();
|
Map<String, String> headers = req.getHeaders();
|
||||||
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
|
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
|
||||||
return clusterAgent.queryHttpAddress(protocol, module, resname).thenCompose(addrs -> {
|
return clusterAgent.queryMqtpAddress("mqtp", module, resname).thenCompose(addrmap -> {
|
||||||
|
if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture();
|
||||||
|
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000));
|
||||||
|
if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");
|
||||||
|
if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid);
|
||||||
|
if (headers != null) headers.forEach((n, v) -> {
|
||||||
|
if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v);
|
||||||
|
});
|
||||||
|
builder.header("Content-Type", "x-www-form-urlencoded");
|
||||||
|
String paramstr = req.getParametersToString();
|
||||||
|
if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr));
|
||||||
|
List<CompletableFuture> futures = new ArrayList<>();
|
||||||
|
for (Map.Entry<String, Collection<InetSocketAddress>> en : addrmap.entrySet()) {
|
||||||
|
String realmodule = en.getKey();
|
||||||
|
Collection<InetSocketAddress> addrs = en.getValue();
|
||||||
|
if (addrs == null || addrs.isEmpty()) continue;
|
||||||
|
String suburi = req.getRequestURI();
|
||||||
|
suburi = suburi.substring(1); //跳过 /
|
||||||
|
suburi = "/" + realmodule + suburi.substring(suburi.indexOf('/'));
|
||||||
|
futures.add(forEachCollectionFuture(userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + suburi, builder, addrs.iterator()));
|
||||||
|
}
|
||||||
|
if (futures.isEmpty()) return CompletableFuture.completedFuture(null);
|
||||||
|
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<HttpResult<byte[]>> httpAsync(int userid, HttpSimpleRequest req) {
|
||||||
|
String module = req.getRequestURI();
|
||||||
|
module = module.substring(1); //去掉/
|
||||||
|
module = module.substring(0, module.indexOf('/'));
|
||||||
|
Map<String, String> headers = req.getHeaders();
|
||||||
|
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
|
||||||
|
return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> {
|
||||||
if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture();
|
if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture();
|
||||||
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000));
|
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000));
|
||||||
if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");
|
if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");
|
||||||
@@ -74,17 +106,16 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
builder.header("Content-Type", "x-www-form-urlencoded");
|
builder.header("Content-Type", "x-www-form-urlencoded");
|
||||||
String paramstr = req.getParametersToString();
|
String paramstr = req.getParametersToString();
|
||||||
if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr));
|
if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr));
|
||||||
return forEachCollectionFuture(userid, req, builder, addrs.iterator());
|
return forEachCollectionFuture(userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), builder, addrs.iterator());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(int userid, HttpSimpleRequest req, java.net.http.HttpRequest.Builder builder, Iterator<InetSocketAddress> it) {
|
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(int userid, HttpSimpleRequest req, String requesturi, java.net.http.HttpRequest.Builder builder, Iterator<InetSocketAddress> it) {
|
||||||
if (!it.hasNext()) return CompletableFuture.completedFuture(null);
|
if (!it.hasNext()) return CompletableFuture.completedFuture(null);
|
||||||
InetSocketAddress addr = it.next();
|
InetSocketAddress addr = it.next();
|
||||||
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI();
|
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi;
|
||||||
builder.uri(URI.create(url));
|
return httpClient.sendAsync(builder.copy().uri(URI.create(url)).build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> {
|
||||||
return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> {
|
if (resp.statusCode() != 200) return forEachCollectionFuture(userid, req, requesturi, builder, it);
|
||||||
if (resp.statusCode() != 200) return forEachCollectionFuture(userid, req, builder, it);
|
|
||||||
HttpResult rs = new HttpResult();
|
HttpResult rs = new HttpResult();
|
||||||
java.net.http.HttpHeaders hs = resp.headers();
|
java.net.http.HttpHeaders hs = resp.headers();
|
||||||
if (hs != null) {
|
if (hs != null) {
|
||||||
|
|||||||
Reference in New Issue
Block a user