diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 4b784eace..a81751d83 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -456,8 +456,9 @@ public abstract class NodeServer { } continue; } - final String group = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry) ? null : entry.getGroup(); - final boolean localMode = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry) || serviceImplClass.getAnnotation(Local.class) != null;//本地模式 + boolean isLocalGroup0 = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry); + final String group = isLocalGroup0 ? null : entry.getGroup(); + final boolean localMode = serviceImplClass.getAnnotation(Local.class) != null || isLocalGroup0;//本地模式 if (localMode && (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) { continue; //本地模式不能实例化接口和抽象类的Service类 } diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index 1dc9f9c0e..7bc84230c 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -42,10 +42,13 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { protected ScheduledFuture taskFuture; //可能被HttpMessageClient用到的服务 key: serviceName - protected final ConcurrentHashMap> httpAddressMap = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap> httpAddressMap = new ConcurrentHashMap<>(); + + //可能被sncp用到的服务 key: serviceName + protected final ConcurrentHashMap> sncpAddressMap = new ConcurrentHashMap<>(); //可能被mqtp用到的服务 key: serviceName - protected final ConcurrentHashMap> mqtpAddressMap = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap> mqtpAddressMap = new ConcurrentHashMap<>(); @Override public void init(ResourceFactory factory, AnyValue config) { @@ -134,12 +137,13 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { try { checkApplicationHealth(); checkHttpAddressHealth(); + loadSncpAddressHealth(); loadMqtpAddressHealth(); localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> { checkLocalHealth(entry); }); remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> { - updateSncpTransport(entry); + updateSncpAddress(entry); }); } catch (Exception e) { logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e instanceof CompletionException ? ((CompletionException) e).getCause() : e); @@ -147,6 +151,17 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { }; } + protected void loadSncpAddressHealth() { + List keys = source.keysStartsWith("cluster.sncp:"); + keys.forEach(serviceName -> { + try { + this.sncpAddressMap.put(serviceName, queryAddress(serviceName).get(3, TimeUnit.SECONDS)); + } catch (Exception e) { + logger.log(Level.SEVERE, "loadSncpAddressHealth check " + serviceName + " error", e); + } + }); + } + protected void loadMqtpAddressHealth() { List keys = source.keysStartsWith("cluster.mqtp:"); keys.forEach(serviceName -> { @@ -181,9 +196,22 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { source.hset(entry.checkName, entry.checkid, AddressEntry.class, newaddr); } + @Override //获取SNCP远程服务的可用ip列表 + public CompletableFuture> querySncpAddress(String protocol, String module, String resname) { + final String serviceName = generateSncpServiceName(protocol, module, resname); + Set rs = sncpAddressMap.get(serviceName); + if (rs != null) { + return CompletableFuture.completedFuture(rs); + } + return queryAddress(serviceName).thenApply(t -> { + sncpAddressMap.put(serviceName, t); + return t; + }); + } + @Override //获取MQTP的HTTP远程服务的可用ip列表, key = serviceName的后半段 - public CompletableFuture>> queryMqtpAddress(String protocol, String module, String resname) { - final Map> rsmap = new ConcurrentHashMap<>(); + public CompletableFuture>> queryMqtpAddress(String protocol, String module, String resname) { + final Map> rsmap = new ConcurrentHashMap<>(); final String servicenamprefix = generateHttpServiceName(protocol, module, null) + ":"; mqtpAddressMap.keySet().stream().filter(k -> k.startsWith(servicenamprefix)) .forEach(sn -> rsmap.put(sn.substring(servicenamprefix.length()), mqtpAddressMap.get(sn))); @@ -191,9 +219,9 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { } @Override //获取HTTP远程服务的可用ip列表 - public CompletableFuture> queryHttpAddress(String protocol, String module, String resname) { + public CompletableFuture> queryHttpAddress(String protocol, String module, String resname) { final String serviceName = generateHttpServiceName(protocol, module, resname); - Collection rs = httpAddressMap.get(serviceName); + Set rs = httpAddressMap.get(serviceName); if (rs != null) { return CompletableFuture.completedFuture(rs); } @@ -204,11 +232,11 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { } @Override - protected CompletableFuture> queryAddress(final ClusterEntry entry) { + protected CompletableFuture> queryAddress(final ClusterEntry entry) { return queryAddress(entry.serviceName); } - private CompletableFuture> queryAddress(final String serviceName) { + private CompletableFuture> queryAddress(final String serviceName) { final CompletableFuture> future = source.hmapAsync(serviceName, AddressEntry.class, 0, 10000); return future.thenApply(map -> { final Set set = new HashSet<>(); @@ -321,6 +349,11 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { return "cluster." + super.generateHttpServiceName(protocol, module, resname); } + @Override + public String generateSncpServiceName(String protocol, String restype, String resname) { + return "cluster." + super.generateSncpServiceName(protocol, restype, resname); + } + @Override protected String generateApplicationCheckName() { return generateApplicationServiceName(); diff --git a/src/main/java/org/redkale/cluster/ClusterAgent.java b/src/main/java/org/redkale/cluster/ClusterAgent.java index 4733d5cf8..abde9ed5b 100644 --- a/src/main/java/org/redkale/cluster/ClusterAgent.java +++ b/src/main/java/org/redkale/cluster/ClusterAgent.java @@ -10,7 +10,7 @@ import java.net.*; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; -import java.util.logging.Logger; +import java.util.logging.*; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.*; import org.redkale.annotation.ResourceListener; @@ -48,6 +48,9 @@ public abstract class ClusterAgent { @Resource(name = RESNAME_APP_ADDR) protected InetSocketAddress appAddress; + @Resource(required = false) + protected Application application; + protected String name; protected boolean waits; @@ -158,7 +161,7 @@ public abstract class ClusterAgent { if (ns.isSNCP()) { for (Service service : remoteServices) { ClusterEntry entry = new ClusterEntry(ns, protocol, service); - updateSncpTransport(entry); + updateSncpAddress(entry); remoteEntrys.put(entry.serviceid, entry); } } @@ -222,13 +225,16 @@ public abstract class ClusterAgent { } //获取MQTP的HTTP远程服务的可用ip列表, key = serviceName的后半段 - public abstract CompletableFuture>> queryMqtpAddress(String protocol, String module, String resname); + public abstract CompletableFuture>> queryMqtpAddress(String protocol, String module, String resname); //获取HTTP远程服务的可用ip列表 - public abstract CompletableFuture> queryHttpAddress(String protocol, String module, String resname); + public abstract CompletableFuture> queryHttpAddress(String protocol, String module, String resname); + + //获取SNCP远程服务的可用ip列表 restype: resourceType.getName() + public abstract CompletableFuture> querySncpAddress(String protocol, String restype, String resname); //获取远程服务的可用ip列表 - protected abstract CompletableFuture> queryAddress(ClusterEntry entry); + protected abstract CompletableFuture> queryAddress(ClusterEntry entry); //注册服务 protected abstract ClusterEntry register(NodeServer ns, String protocol, Service service); @@ -237,13 +243,21 @@ public abstract class ClusterAgent { protected abstract void deregister(NodeServer ns, String protocol, Service service); //格式: protocol:classtype-resourcename - protected void updateSncpTransport(ClusterEntry entry) { + protected void updateSncpAddress(ClusterEntry entry) { + if (application == null) { + return; + } Service service = entry.serviceRef.get(); if (service == null) { return; } - Collection addrs = ClusterAgent.this.queryAddress(entry).join(); - //Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), entry.netProtocol, entry.address, null, addrs); + try { + Set addrs = ClusterAgent.this.queryAddress(entry).join(); + SncpRpcGroups rpcGroups = application.getSncpRpcGroups(); + rpcGroups.putClusterAddress(entry.resourceid, addrs); + } catch (Exception e) { + logger.log(Level.SEVERE, entry + " updateSncpAddress error", e); + } } protected String urlEncode(String value) { @@ -282,6 +296,10 @@ public abstract class ClusterAgent { return "-"; } + public String generateSncpServiceName(String protocol, String restype, String resname) { + return protocol.toLowerCase() + serviceSeparator() + restype + (resname == null || resname.isEmpty() ? "" : ("-" + resname)); + } + //也会提供给HttpMessageClusterAgent适用 public String generateHttpServiceName(String protocol, String module, String resname) { return protocol.toLowerCase() + serviceSeparator() + module + (resname == null || resname.isEmpty() ? "" : ("-" + resname)); @@ -367,9 +385,11 @@ public abstract class ClusterAgent { //以协议+Rest资源名为主 服务类名 public String serviceName; - public String serviceType; + public final String resourceType; - public String resourceName; + public final String resourceName; + + public final String resourceid; public String checkid; @@ -395,8 +415,10 @@ public abstract class ClusterAgent { this.serviceName = generateServiceName(ns, protocol, service); this.checkid = generateCheckId(ns, protocol, service); this.checkName = generateCheckName(ns, protocol, service); - this.serviceType = Sncp.getResourceType(service).getName(); + Class restype = Sncp.getResourceType(service); + this.resourceType = restype.getName(); this.resourceName = Sncp.getResourceName(service); + this.resourceid = Sncp.resourceid(resourceName, restype); this.protocol = protocol; InetSocketAddress addr = ns.getSocketAddress(); String host = addr.getHostString(); diff --git a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java index ddcf8ba8e..89c47f734 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java @@ -136,7 +136,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, "mqtpAsync: module=" + localModule + ", resname=" + resname + ", addrmap=" + addrmap); } - for (Map.Entry> en : addrmap.entrySet()) { + for (Map.Entry> en : addrmap.entrySet()) { String realmodule = en.getKey(); Collection addrs = en.getValue(); if (addrs == null || addrs.isEmpty()) { diff --git a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java index e5a88ba09..fa47c5c45 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java @@ -226,6 +226,10 @@ public class SncpRemoteInfo { } protected InetSocketAddress nextRemoteAddress() { + InetSocketAddress addr = sncpRpcGroups.nextRemoteAddress(resourceid); + if (addr == null) { + return addr; + } SncpRpcGroup srg = sncpRpcGroups.getSncpRpcGroup(remoteGroup); if (srg != null) { Set addrs = srg.getAddresses(); @@ -236,7 +240,7 @@ public class SncpRemoteInfo { } } } - throw new SncpException("Not found SocketAddress by remoteGroup = " + remoteGroup); + throw new SncpException("Not found SocketAddress by remoteGroup = " + remoteGroup + ", resourceid = " + resourceid); } @Override diff --git a/src/main/java/org/redkale/net/sncp/SncpRpcGroups.java b/src/main/java/org/redkale/net/sncp/SncpRpcGroups.java index 28aa7eb25..4caddefef 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRpcGroups.java +++ b/src/main/java/org/redkale/net/sncp/SncpRpcGroups.java @@ -4,7 +4,7 @@ package org.redkale.net.sncp; import java.net.InetSocketAddress; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import org.redkale.boot.ClassFilter; @@ -19,7 +19,10 @@ import org.redkale.boot.ClassFilter; */ public class SncpRpcGroups { - protected final Map sncpRpcGroups = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap sncpRpcGroups = new ConcurrentHashMap<>(); + + //key: resourceid + protected final ConcurrentHashMap> sncpClusters = new ConcurrentHashMap<>(); public SncpRpcGroup getSncpRpcGroup(String group) { return sncpRpcGroups.get(group); @@ -33,6 +36,26 @@ public class SncpRpcGroups { return sncpRpcGroups.computeIfAbsent(group, g -> new SncpRpcGroup(group, protocol)); } + public InetSocketAddress nextRemoteAddress(String resourceid) { + if (sncpClusters.isEmpty()) { + return null; + } + Set addrs = sncpClusters.get(resourceid); + if (!addrs.isEmpty()) { + Iterator it = addrs.iterator(); + if (it.hasNext()) { + return it.next(); + } + } + return null; + } + + public void putClusterAddress(String resourceid, Set set) { + Objects.requireNonNull(resourceid); + Objects.requireNonNull(set); + sncpClusters.put(resourceid, set); + } + public String getGroup(InetSocketAddress address) { for (SncpRpcGroup g : sncpRpcGroups.values()) { if (g.containsAddress(address)) {