This commit is contained in:
Redkale
2020-07-15 20:58:03 +08:00
parent 8e2d6acdf8
commit a79b8b77b5
3 changed files with 82 additions and 5 deletions

View File

@@ -14,6 +14,7 @@ import javax.annotation.Resource;
import org.redkale.boot.*;
import static org.redkale.boot.Application.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.MessageMultiConsumer;
import org.redkale.net.*;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
@@ -109,6 +110,14 @@ public abstract class ClusterAgent {
register(ns, protocol, service);
ClusterEntry entry = new ClusterEntry(ns, protocol, service);
localEntrys.put(entry.serviceid, entry);
if (protocol.toLowerCase().startsWith("http")) {
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
if (mmc != null) {
register(ns, "mqtp", service);
ClusterEntry mqentry = new ClusterEntry(ns, "mqtp", service);
localEntrys.put(entry.serviceid, mqentry);
}
}
}
//远程模式加载IP列表, 只支持SNCP协议
if (ns.isSNCP()) {
@@ -157,6 +166,9 @@ public abstract class ClusterAgent {
return 10;
}
//获取MQTP的HTTP远程服务的可用ip列表, key = servicename
public abstract CompletableFuture<Map<String, Collection<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname);
//获取HTTP远程服务的可用ip列表
public abstract CompletableFuture<Collection<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname);
@@ -173,7 +185,7 @@ public abstract class ClusterAgent {
protected void updateSncpTransport(ClusterEntry entry) {
Service service = entry.serviceref.get();
if (service == null) return;
Collection<InetSocketAddress> addrs = queryAddress(entry).join();
Collection<InetSocketAddress> addrs = ClusterAgent.this.queryAddress(entry).join();
Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), entry.netprotocol, entry.address, null, addrs);
}
@@ -205,6 +217,12 @@ public abstract class ClusterAgent {
String module = Rest.getRestModule(service).toLowerCase();
return protocol.toLowerCase() + ":" + module + (resname.isEmpty() ? "" : ("-" + resname));
}
if ("mqtp".equalsIgnoreCase(protocol)) {
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
String resname = Sncp.getResourceName(service);
String selfmodule = Rest.getRestModule(service).toLowerCase();
return protocol.toLowerCase() + ":" + mmc.module() + ":" + selfmodule + (resname.isEmpty() ? "" : ("-" + resname));
}
if (!Sncp.isSncpDyn(service)) return protocol.toLowerCase() + ":" + service.getClass().getName();
String resname = Sncp.getResourceName(service);
return protocol.toLowerCase() + ":" + Sncp.getResourceType(service).getName() + (resname.isEmpty() ? "" : ("-" + resname));

View File

@@ -100,6 +100,54 @@ public class HttpMessageClient extends MessageClient {
produceMessage(topic, convertType, userid, groupid, request, null);
}
public final void broadcastMessage(HttpSimpleRequest request) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null);
}
public final void broadcastMessage(HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter);
}
public final void broadcastMessage(int userid, HttpSimpleRequest request) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, null, request, null);
}
public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null);
}
public final void broadcastMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter);
}
public final void broadcastMessage(String topic, HttpSimpleRequest request) {
broadcastMessage(topic, ConvertType.JSON, 0, null, request, null);
}
public final void broadcastMessage(String topic, HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(topic, ConvertType.JSON, 0, null, request, counter);
}
public final void broadcastMessage(String topic, ConvertType convertType, HttpSimpleRequest request) {
broadcastMessage(topic, convertType, 0, null, request, null);
}
public final void broadcastMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(topic, convertType, 0, null, request, counter);
}
public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request) {
broadcastMessage(topic, ConvertType.JSON, userid, groupid, request, null);
}
public final void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
broadcastMessage(topic, ConvertType.JSON, userid, groupid, request, counter);
}
public final void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) {
broadcastMessage(topic, convertType, userid, groupid, request, null);
}
public final <T> CompletableFuture<T> sendMessage(HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) return null;
@@ -168,6 +216,12 @@ public class HttpMessageClient extends MessageClient {
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
}
public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
sendMessage(message, false, counter);
}
public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);

View File

@@ -44,21 +44,26 @@ public class HttpMessageClusterClient extends HttpMessageClient {
@Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
return httpAsync(userid, request);
return httpAsync("http", userid, request);
}
@Override
public void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
httpAsync(userid, request);
httpAsync("http", userid, request);
}
protected CompletableFuture<HttpResult<byte[]>> httpAsync(int userid, HttpSimpleRequest req) {
@Override
public void broadcastMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
httpAsync("mqtp", userid, request);
}
private CompletableFuture<HttpResult<byte[]>> httpAsync(String protocol, int userid, HttpSimpleRequest req) {
String module = req.getRequestURI();
module = module.substring(1); //去掉/
module = module.substring(0, module.indexOf('/'));
Map<String, String> headers = req.getHeaders();
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> {
return clusterAgent.queryHttpAddress(protocol, module, resname).thenCompose(addrs -> {
if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture();
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(6000));
if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");