优化sncp cluster配置

This commit is contained in:
redkale
2023-03-23 15:21:53 +08:00
parent ac9995fb1a
commit 051299d4a5
6 changed files with 109 additions and 26 deletions

View File

@@ -456,8 +456,9 @@ public abstract class NodeServer {
}
continue;
}
final String group = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry) ? null : entry.getGroup();
final boolean localMode = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry) || serviceImplClass.getAnnotation(Local.class) != null;//本地模式
boolean isLocalGroup0 = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry);
final String group = isLocalGroup0 ? null : entry.getGroup();
final boolean localMode = serviceImplClass.getAnnotation(Local.class) != null || isLocalGroup0;//本地模式
if (localMode && (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) {
continue; //本地模式不能实例化接口和抽象类的Service类
}

View File

@@ -42,10 +42,13 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
protected ScheduledFuture taskFuture;
//可能被HttpMessageClient用到的服务 key: serviceName
protected final ConcurrentHashMap<String, Collection<InetSocketAddress>> httpAddressMap = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, Set<InetSocketAddress>> httpAddressMap = new ConcurrentHashMap<>();
//可能被sncp用到的服务 key: serviceName
protected final ConcurrentHashMap<String, Set<InetSocketAddress>> sncpAddressMap = new ConcurrentHashMap<>();
//可能被mqtp用到的服务 key: serviceName
protected final ConcurrentHashMap<String, Collection<InetSocketAddress>> mqtpAddressMap = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, Set<InetSocketAddress>> mqtpAddressMap = new ConcurrentHashMap<>();
@Override
public void init(ResourceFactory factory, AnyValue config) {
@@ -134,12 +137,13 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
try {
checkApplicationHealth();
checkHttpAddressHealth();
loadSncpAddressHealth();
loadMqtpAddressHealth();
localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> {
checkLocalHealth(entry);
});
remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> {
updateSncpTransport(entry);
updateSncpAddress(entry);
});
} catch (Exception e) {
logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e instanceof CompletionException ? ((CompletionException) e).getCause() : e);
@@ -147,6 +151,17 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
};
}
protected void loadSncpAddressHealth() {
List<String> keys = source.keysStartsWith("cluster.sncp:");
keys.forEach(serviceName -> {
try {
this.sncpAddressMap.put(serviceName, queryAddress(serviceName).get(3, TimeUnit.SECONDS));
} catch (Exception e) {
logger.log(Level.SEVERE, "loadSncpAddressHealth check " + serviceName + " error", e);
}
});
}
protected void loadMqtpAddressHealth() {
List<String> keys = source.keysStartsWith("cluster.mqtp:");
keys.forEach(serviceName -> {
@@ -181,9 +196,22 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
source.hset(entry.checkName, entry.checkid, AddressEntry.class, newaddr);
}
@Override //获取SNCP远程服务的可用ip列表
public CompletableFuture<Set<InetSocketAddress>> querySncpAddress(String protocol, String module, String resname) {
final String serviceName = generateSncpServiceName(protocol, module, resname);
Set<InetSocketAddress> rs = sncpAddressMap.get(serviceName);
if (rs != null) {
return CompletableFuture.completedFuture(rs);
}
return queryAddress(serviceName).thenApply(t -> {
sncpAddressMap.put(serviceName, t);
return t;
});
}
@Override //获取MQTP的HTTP远程服务的可用ip列表, key = serviceName的后半段
public CompletableFuture<Map<String, Collection<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname) {
final Map<String, Collection<InetSocketAddress>> rsmap = new ConcurrentHashMap<>();
public CompletableFuture<Map<String, Set<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname) {
final Map<String, Set<InetSocketAddress>> rsmap = new ConcurrentHashMap<>();
final String servicenamprefix = generateHttpServiceName(protocol, module, null) + ":";
mqtpAddressMap.keySet().stream().filter(k -> k.startsWith(servicenamprefix))
.forEach(sn -> rsmap.put(sn.substring(servicenamprefix.length()), mqtpAddressMap.get(sn)));
@@ -191,9 +219,9 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
}
@Override //获取HTTP远程服务的可用ip列表
public CompletableFuture<Collection<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname) {
public CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname) {
final String serviceName = generateHttpServiceName(protocol, module, resname);
Collection<InetSocketAddress> rs = httpAddressMap.get(serviceName);
Set<InetSocketAddress> rs = httpAddressMap.get(serviceName);
if (rs != null) {
return CompletableFuture.completedFuture(rs);
}
@@ -204,11 +232,11 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
}
@Override
protected CompletableFuture<Collection<InetSocketAddress>> queryAddress(final ClusterEntry entry) {
protected CompletableFuture<Set<InetSocketAddress>> queryAddress(final ClusterEntry entry) {
return queryAddress(entry.serviceName);
}
private CompletableFuture<Collection<InetSocketAddress>> queryAddress(final String serviceName) {
private CompletableFuture<Set<InetSocketAddress>> queryAddress(final String serviceName) {
final CompletableFuture<Map<String, AddressEntry>> future = source.hmapAsync(serviceName, AddressEntry.class, 0, 10000);
return future.thenApply(map -> {
final Set<InetSocketAddress> set = new HashSet<>();
@@ -321,6 +349,11 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
return "cluster." + super.generateHttpServiceName(protocol, module, resname);
}
@Override
public String generateSncpServiceName(String protocol, String restype, String resname) {
return "cluster." + super.generateSncpServiceName(protocol, restype, resname);
}
@Override
protected String generateApplicationCheckName() {
return generateApplicationServiceName();

View File

@@ -10,7 +10,7 @@ import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Logger;
import java.util.logging.*;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.*;
import org.redkale.annotation.ResourceListener;
@@ -48,6 +48,9 @@ public abstract class ClusterAgent {
@Resource(name = RESNAME_APP_ADDR)
protected InetSocketAddress appAddress;
@Resource(required = false)
protected Application application;
protected String name;
protected boolean waits;
@@ -158,7 +161,7 @@ public abstract class ClusterAgent {
if (ns.isSNCP()) {
for (Service service : remoteServices) {
ClusterEntry entry = new ClusterEntry(ns, protocol, service);
updateSncpTransport(entry);
updateSncpAddress(entry);
remoteEntrys.put(entry.serviceid, entry);
}
}
@@ -222,13 +225,16 @@ public abstract class ClusterAgent {
}
//获取MQTP的HTTP远程服务的可用ip列表, key = serviceName的后半段
public abstract CompletableFuture<Map<String, Collection<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname);
public abstract CompletableFuture<Map<String, Set<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname);
//获取HTTP远程服务的可用ip列表
public abstract CompletableFuture<Collection<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname);
public abstract CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname);
//获取SNCP远程服务的可用ip列表 restype: resourceType.getName()
public abstract CompletableFuture<Set<InetSocketAddress>> querySncpAddress(String protocol, String restype, String resname);
//获取远程服务的可用ip列表
protected abstract CompletableFuture<Collection<InetSocketAddress>> queryAddress(ClusterEntry entry);
protected abstract CompletableFuture<Set<InetSocketAddress>> queryAddress(ClusterEntry entry);
//注册服务
protected abstract ClusterEntry register(NodeServer ns, String protocol, Service service);
@@ -237,13 +243,21 @@ public abstract class ClusterAgent {
protected abstract void deregister(NodeServer ns, String protocol, Service service);
//格式: protocol:classtype-resourcename
protected void updateSncpTransport(ClusterEntry entry) {
protected void updateSncpAddress(ClusterEntry entry) {
if (application == null) {
return;
}
Service service = entry.serviceRef.get();
if (service == null) {
return;
}
Collection<InetSocketAddress> addrs = ClusterAgent.this.queryAddress(entry).join();
//Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), entry.netProtocol, entry.address, null, addrs);
try {
Set<InetSocketAddress> addrs = ClusterAgent.this.queryAddress(entry).join();
SncpRpcGroups rpcGroups = application.getSncpRpcGroups();
rpcGroups.putClusterAddress(entry.resourceid, addrs);
} catch (Exception e) {
logger.log(Level.SEVERE, entry + " updateSncpAddress error", e);
}
}
protected String urlEncode(String value) {
@@ -282,6 +296,10 @@ public abstract class ClusterAgent {
return "-";
}
public String generateSncpServiceName(String protocol, String restype, String resname) {
return protocol.toLowerCase() + serviceSeparator() + restype + (resname == null || resname.isEmpty() ? "" : ("-" + resname));
}
//也会提供给HttpMessageClusterAgent适用
public String generateHttpServiceName(String protocol, String module, String resname) {
return protocol.toLowerCase() + serviceSeparator() + module + (resname == null || resname.isEmpty() ? "" : ("-" + resname));
@@ -367,9 +385,11 @@ public abstract class ClusterAgent {
//以协议+Rest资源名为主 服务类名
public String serviceName;
public String serviceType;
public final String resourceType;
public String resourceName;
public final String resourceName;
public final String resourceid;
public String checkid;
@@ -395,8 +415,10 @@ public abstract class ClusterAgent {
this.serviceName = generateServiceName(ns, protocol, service);
this.checkid = generateCheckId(ns, protocol, service);
this.checkName = generateCheckName(ns, protocol, service);
this.serviceType = Sncp.getResourceType(service).getName();
Class restype = Sncp.getResourceType(service);
this.resourceType = restype.getName();
this.resourceName = Sncp.getResourceName(service);
this.resourceid = Sncp.resourceid(resourceName, restype);
this.protocol = protocol;
InetSocketAddress addr = ns.getSocketAddress();
String host = addr.getHostString();

View File

@@ -136,7 +136,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "mqtpAsync: module=" + localModule + ", resname=" + resname + ", addrmap=" + addrmap);
}
for (Map.Entry<String, Collection<InetSocketAddress>> en : addrmap.entrySet()) {
for (Map.Entry<String, Set<InetSocketAddress>> en : addrmap.entrySet()) {
String realmodule = en.getKey();
Collection<InetSocketAddress> addrs = en.getValue();
if (addrs == null || addrs.isEmpty()) {

View File

@@ -226,6 +226,10 @@ public class SncpRemoteInfo<T extends Service> {
}
protected InetSocketAddress nextRemoteAddress() {
InetSocketAddress addr = sncpRpcGroups.nextRemoteAddress(resourceid);
if (addr == null) {
return addr;
}
SncpRpcGroup srg = sncpRpcGroups.getSncpRpcGroup(remoteGroup);
if (srg != null) {
Set<InetSocketAddress> addrs = srg.getAddresses();
@@ -236,7 +240,7 @@ public class SncpRemoteInfo<T extends Service> {
}
}
}
throw new SncpException("Not found SocketAddress by remoteGroup = " + remoteGroup);
throw new SncpException("Not found SocketAddress by remoteGroup = " + remoteGroup + ", resourceid = " + resourceid);
}
@Override

View File

@@ -4,7 +4,7 @@
package org.redkale.net.sncp;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.redkale.boot.ClassFilter;
@@ -19,7 +19,10 @@ import org.redkale.boot.ClassFilter;
*/
public class SncpRpcGroups {
protected final Map<String, SncpRpcGroup> sncpRpcGroups = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, SncpRpcGroup> sncpRpcGroups = new ConcurrentHashMap<>();
//key: resourceid
protected final ConcurrentHashMap<String, Set<InetSocketAddress>> sncpClusters = new ConcurrentHashMap<>();
public SncpRpcGroup getSncpRpcGroup(String group) {
return sncpRpcGroups.get(group);
@@ -33,6 +36,26 @@ public class SncpRpcGroups {
return sncpRpcGroups.computeIfAbsent(group, g -> new SncpRpcGroup(group, protocol));
}
public InetSocketAddress nextRemoteAddress(String resourceid) {
if (sncpClusters.isEmpty()) {
return null;
}
Set<InetSocketAddress> addrs = sncpClusters.get(resourceid);
if (!addrs.isEmpty()) {
Iterator<InetSocketAddress> it = addrs.iterator();
if (it.hasNext()) {
return it.next();
}
}
return null;
}
public void putClusterAddress(String resourceid, Set<InetSocketAddress> set) {
Objects.requireNonNull(resourceid);
Objects.requireNonNull(set);
sncpClusters.put(resourceid, set);
}
public String getGroup(InetSocketAddress address) {
for (SncpRpcGroup g : sncpRpcGroups.values()) {
if (g.containsAddress(address)) {