From e2a1fa15d36a287b4061b50290416a2b78da5934 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 14 Jan 2021 12:01:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0CacheClusterAgent=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/boot/Application.java | 48 ++- .../redkale/cluster/CacheClusterAgent.java | 334 ++++++++++++++++++ src/org/redkale/mq/MessageClient.java | 2 +- 3 files changed, 381 insertions(+), 3 deletions(-) create mode 100644 src/org/redkale/cluster/CacheClusterAgent.java diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 232e99ca2..b62373281 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -22,6 +22,7 @@ import javax.annotation.Resource; import javax.net.ssl.SSLContext; import javax.xml.parsers.*; import org.redkale.boot.ClassFilter.FilterEntry; +import org.redkale.cluster.*; import org.redkale.convert.Convert; import org.redkale.convert.bson.BsonFactory; import org.redkale.convert.json.*; @@ -356,6 +357,13 @@ public final class Application { break; } } + if (cluster == null) { + ClusterAgent cacheClusterAgent = new CacheClusterAgent(); + if (cacheClusterAgent.match(clusterConf)) { + cluster = cacheClusterAgent; + cluster.setConfig(clusterConf); + } + } if (cluster == null) logger.log(Level.SEVERE, "load application cluster resource, but not found name='value' value error: " + clusterConf); } else { Class type = classLoader.loadClass(clusterConf.getValue("value")); @@ -673,6 +681,10 @@ public final class Application { if (this.clusterAgent != null) { if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "ClusterAgent initing"); long s = System.currentTimeMillis(); + if (this.clusterAgent instanceof CacheClusterAgent) { + String sourceName = ((CacheClusterAgent) clusterAgent).getSourceName(); //必须在inject前调用,需要赋值Resourcable.name + loadCacheSource(sourceName); + } clusterAgent.setTransportFactory(this.sncpTransportFactory); this.resourceFactory.inject(clusterAgent); clusterAgent.init(clusterAgent.getConfig()); @@ -692,7 +704,7 @@ public final class Application { logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); } - //------------------------------------- 注册 DataSource -------------------------------------------------------- + //------------------------------------- 注册 HttpMessageClient -------------------------------------------------------- resourceFactory.register((ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> { try { if (field.getAnnotation(Resource.class) == null) return; @@ -702,12 +714,44 @@ public final class Application { rf.inject(messageClient, null); // 给其可能包含@Resource的字段赋值; rf.register(resourceName, HttpMessageClient.class, messageClient); } catch (Exception e) { - logger.log(Level.SEVERE, "[" + Thread.currentThread().getName() + "] DataSource inject error", e); + logger.log(Level.SEVERE, "[" + Thread.currentThread().getName() + "] HttpMessageClient inject error", e); } }, HttpMessageClient.class); initResources(); } + private void loadCacheSource(final String sourceName) { + final AnyValue resources = config.getAnyValue("resources"); + for (AnyValue sourceConf : resources.getAnyValues("source")) { + if (!sourceName.equals(sourceConf.getValue("name"))) continue; + String classval = sourceConf.getValue("value"); + try { + Class sourceType = CacheMemorySource.class; + if (classval == null || classval.isEmpty()) { + Iterator it = ServiceLoader.load(CacheSource.class, serverClassLoader).iterator(); + while (it.hasNext()) { + CacheSource s = it.next(); + if (s.match(sourceConf)) { + sourceType = s.getClass(); + break; + } + } + } else { + sourceType = serverClassLoader.loadClass(classval); + } + CacheSource source = Modifier.isFinal(sourceType.getModifiers()) ? (CacheSource) sourceType.getConstructor().newInstance() : (CacheSource) Sncp.createLocalService(serverClassLoader, sourceName, sourceType, null, resourceFactory, sncpTransportFactory, null, null, sourceConf); + cacheSources.add((CacheSource) source); + resourceFactory.register(sourceName, CacheSource.class, source); + resourceFactory.inject(source); + if (source instanceof Service) ((Service) source).init(sourceConf); + logger.info("[" + Thread.currentThread().getName() + "] Load Source resourceName = " + sourceName + ", source = " + source); + } catch (Exception e) { + logger.log(Level.SEVERE, "load application source resource error: " + sourceConf, e); + } + return; + } + } + private void initResources() throws Exception { //------------------------------------------------------------------------- final AnyValue resources = config.getAnyValue("resources"); diff --git a/src/org/redkale/cluster/CacheClusterAgent.java b/src/org/redkale/cluster/CacheClusterAgent.java new file mode 100644 index 000000000..71013521a --- /dev/null +++ b/src/org/redkale/cluster/CacheClusterAgent.java @@ -0,0 +1,334 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.cluster; + +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.Level; +import javax.annotation.Resource; +import org.redkale.boot.*; +import org.redkale.convert.json.JsonConvert; +import org.redkale.service.Service; +import org.redkale.source.CacheSource; +import org.redkale.util.*; + +/** + * 使用CacheSource实现的第三方服务发现管理接口cluster + * + * + * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.3.0 + */ +public class CacheClusterAgent extends ClusterAgent implements Resourcable { + + @Resource(name = "$") + private CacheSource source; + + private String sourceName; + + protected int ttls = 10; //定时检查的秒数 + + protected ScheduledThreadPoolExecutor scheduler; + + //可能被HttpMessageClient用到的服务 key: servicename + protected final ConcurrentHashMap> httpAddressMap = new ConcurrentHashMap<>(); + + //可能被mqtp用到的服务 key: servicename + protected final ConcurrentHashMap> mqtpAddressMap = new ConcurrentHashMap<>(); + + @Override + public void init(AnyValue config) { + super.init(config); + this.sourceName = getSourceName(); + + AnyValue[] properties = config.getAnyValues("property"); + for (AnyValue property : properties) { + if ("ttls".equalsIgnoreCase(property.getValue("name"))) { + this.ttls = Integer.parseInt(property.getValue("value", "").trim()); + if (this.ttls < 5) this.ttls = 10; + } + } + } + + @Override + public void destroy(AnyValue config) { + if (scheduler != null) scheduler.shutdownNow(); + } + + public String getSourceName() { + AnyValue[] properties = config.getAnyValues("property"); + for (AnyValue property : properties) { + if ("source".equalsIgnoreCase(property.getValue("name")) + && property.getValue("value") != null) { + this.sourceName = property.getValue("value"); + return this.sourceName; + } + } + return null; + } + + @Override + public String resourceName() { + return sourceName; + } + + @Override //ServiceLoader时判断配置是否符合当前实现类 + public boolean match(AnyValue config) { + if (config == null) return false; + AnyValue[] properties = config.getAnyValues("property"); + if (properties == null || properties.length == 0) return false; + for (AnyValue property : properties) { + if ("source".equalsIgnoreCase(property.getValue("name")) + && property.getValue("value") != null) return true; + } + return false; + } + + @Override + public void start() { + if (this.scheduler == null) { + this.scheduler = new ScheduledThreadPoolExecutor(4, (Runnable r) -> { + final Thread t = new Thread(r, CacheClusterAgent.class.getSimpleName() + "-Task-Thread"); + t.setDaemon(true); + return t; + }); + + //delay为了错开请求 + this.scheduler.scheduleAtFixedRate(() -> { + checkApplicationHealth(); + checkHttpAddressHealth(); + }, 18, Math.max(2000, ttls * 1000 - 168), TimeUnit.MILLISECONDS); + + this.scheduler.scheduleAtFixedRate(() -> { + loadMqtpAddressHealth(); + }, 88 * 2, Math.max(2000, ttls * 1000 - 168), TimeUnit.MILLISECONDS); + + this.scheduler.scheduleAtFixedRate(() -> { + localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> { + checkLocalHealth(entry); + }); + }, 128 * 3, Math.max(2000, ttls * 1000 - 168), TimeUnit.MILLISECONDS); + + this.scheduler.scheduleAtFixedRate(() -> { + remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> { + updateSncpTransport(entry); + }); + }, 188 * 4, Math.max(2000, ttls * 1000 - 168), TimeUnit.MILLISECONDS); + } + } + + protected void loadMqtpAddressHealth() { + List keys = source.queryKeysStartsWith("cluster.mqtp:"); + keys.forEach(servicename -> { + try { + this.mqtpAddressMap.put(servicename, queryAddress(servicename).get(3, TimeUnit.SECONDS)); + } catch (Exception e) { + logger.log(Level.SEVERE, "loadMqtpAddressHealth check " + servicename + " error", e); + } + }); + } + + protected void checkHttpAddressHealth() { + try { + this.httpAddressMap.keySet().stream().forEach(servicename -> { + try { + this.httpAddressMap.put(servicename, queryAddress(servicename).get(3, TimeUnit.SECONDS)); + } catch (Exception e) { + logger.log(Level.SEVERE, "checkHttpAddressHealth check " + servicename + " error", e); + } + }); + } catch (Exception ex) { + logger.log(Level.SEVERE, "checkHttpAddressHealth check error", ex); + } + } + + protected void checkLocalHealth(final ClusterEntry entry) { + AddressEntry oldaddr = (AddressEntry) source.hget(entry.checkname, entry.checkid, AddressEntry.class); + AddressEntry newaddr = new AddressEntry(); + newaddr.addr = entry.address; + newaddr.nodeid = this.nodeid; + newaddr.time = System.currentTimeMillis(); + source.hset(entry.checkname, entry.checkid, AddressEntry.class, newaddr); + boolean ok = oldaddr != null && (System.currentTimeMillis() - oldaddr.time) / 1000 < ttls; + if (!ok) logger.log(Level.SEVERE, entry.checkid + " check error: " + oldaddr); + } + + @Override //获取MQTP的HTTP远程服务的可用ip列表, key = servicename的后半段 + public CompletableFuture>> queryMqtpAddress(String protocol, String module, String resname) { + final Map> rsmap = new ConcurrentHashMap<>(); + final String servicenamprefix = generateHttpServiceName(protocol, module, null) + ":"; + mqtpAddressMap.keySet().stream().filter(k -> k.startsWith(servicenamprefix)) + .forEach(sn -> rsmap.put(sn.substring(servicenamprefix.length()), mqtpAddressMap.get(sn))); + return CompletableFuture.completedFuture(rsmap); + } + + @Override //获取HTTP远程服务的可用ip列表 + public CompletableFuture> queryHttpAddress(String protocol, String module, String resname) { + final String servicename = generateHttpServiceName(protocol, module, resname); + Collection rs = httpAddressMap.get(servicename); + if (rs != null) return CompletableFuture.completedFuture(rs); + return queryAddress(servicename).thenApply(t -> { + httpAddressMap.put(servicename, t); + return t; + }); + } + + @Override + protected CompletableFuture> queryAddress(final ClusterEntry entry) { + return queryAddress(entry.servicename); + } + + private CompletableFuture> queryAddress(final String servicename) { + final CompletableFuture> future = source.hmapAsync(servicename, AddressEntry.class, 0, 10000); + return future.thenApply(map -> { + final Set set = new HashSet<>(); + map.forEach((n, v) -> { + if (v != null && (System.currentTimeMillis() - v.time) / 1000 < ttls) set.add(v.addr); + }); + return set; + }); + } + + protected boolean isApplicationHealth() { + String servicename = generateApplicationServiceName(); + String serviceid = generateApplicationServiceId(); + AddressEntry entry = (AddressEntry) source.hget(servicename, serviceid, AddressEntry.class); + return entry != null && (System.currentTimeMillis() - entry.time) / 1000 < ttls; + } + + protected void checkApplicationHealth() { + String checkname = generateApplicationServiceName(); + String checkid = generateApplicationCheckId(); + AddressEntry entry = new AddressEntry(); + entry.addr = this.appAddress; + entry.nodeid = this.nodeid; + entry.time = System.currentTimeMillis(); + source.hset(checkname, checkid, AddressEntry.class, entry); + } + + @Override + public void register(Application application) { + if (isApplicationHealth()) throw new RuntimeException("application.nodeid=" + nodeid + " exists in cluster"); + deregister(application); + + String serviceid = generateApplicationServiceId(); + String servicename = generateApplicationServiceName(); + AddressEntry entry = new AddressEntry(); + entry.addr = this.appAddress; + entry.nodeid = this.nodeid; + entry.time = System.currentTimeMillis(); + source.hset(servicename, serviceid, AddressEntry.class, entry); + } + + @Override + public void deregister(Application application) { + String servicename = generateApplicationServiceName(); + source.remove(servicename); + } + + @Override + protected void register(NodeServer ns, String protocol, Service service) { + deregister(ns, protocol, service, false); + // + String serviceid = generateServiceId(ns, protocol, service); + String servicename = generateServiceName(ns, protocol, service); + InetSocketAddress address = ns.isSNCP() ? ns.getSncpAddress() : ns.getServer().getSocketAddress(); + AddressEntry entry = new AddressEntry(); + entry.addr = address; + entry.nodeid = this.nodeid; + entry.time = System.currentTimeMillis(); + source.hset(servicename, serviceid, AddressEntry.class, entry); + } + + @Override + protected void deregister(NodeServer ns, String protocol, Service service) { + deregister(ns, protocol, service, true); + } + + protected void deregister(NodeServer ns, String protocol, Service service, boolean realcanceled) { + String servicename = generateServiceName(ns, protocol, service); + String serviceid = generateServiceId(ns, protocol, service); + ClusterEntry currEntry = null; + for (final ClusterEntry entry : localEntrys.values()) { + if (entry.servicename.equals(servicename) && entry.serviceid.equals(serviceid)) { + currEntry = entry; + break; + } + } + if (currEntry == null) { + for (final ClusterEntry entry : remoteEntrys.values()) { + if (entry.servicename.equals(servicename) && entry.serviceid.equals(serviceid)) { + currEntry = entry; + break; + } + } + } + source.hremove(servicename, serviceid); + if (realcanceled && currEntry != null) currEntry.canceled = true; + } + + @Override + protected String generateApplicationServiceName() { + return "cluster." + super.generateApplicationServiceName(); + } + + @Override + protected String generateServiceName(NodeServer ns, String protocol, Service service) { + return "cluster." + super.generateServiceName(ns, protocol, service); + } + + @Override + public String generateHttpServiceName(String protocol, String module, String resname) { + return "cluster." + super.generateHttpServiceName(protocol, module, resname); + } + + @Override + protected String generateApplicationCheckName() { + return generateApplicationServiceName(); + } + + @Override + protected String generateApplicationCheckId() { + return generateApplicationServiceId(); + } + + @Override + protected String generateCheckName(NodeServer ns, String protocol, Service service) { + return generateServiceName(ns, protocol, service); + } + + @Override + protected String generateCheckId(NodeServer ns, String protocol, Service service) { + return generateServiceId(ns, protocol, service); + } + + public static class AddressEntry { + + public InetSocketAddress addr; + + public int nodeid; + + public long time; + + public AddressEntry() { + } + + public AddressEntry refresh() { + this.time = System.currentTimeMillis(); + return this; + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } + } + +} diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index 020f9c3b8..cac01151a 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -47,7 +47,7 @@ public abstract class MessageClient { protected MessageClient(MessageAgent messageAgent) { this.messageAgent = messageAgent; - this.msgSeqno = messageAgent.msgSeqno; + this.msgSeqno = messageAgent == null ? new AtomicLong() : messageAgent.msgSeqno; this.finest = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINEST); this.finer = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINER); this.fine = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINE);