From d36c168faab97ab65362138246707a94b0649616 Mon Sep 17 00:00:00 2001 From: Redkale Date: Sun, 11 Dec 2022 13:29:33 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=AD=E5=BF=83=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=8A=A8=E6=80=81=E6=9B=B4=E6=94=B9mq=E5=92=8Ccluster?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 198 +++++++++++++++++- .../redkale/cluster/CacheClusterAgent.java | 67 ++++-- .../org/redkale/cluster/ClusterAgent.java | 3 + .../java/org/redkale/mq/MessageAgent.java | 10 +- 4 files changed, 253 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index cd5b5452d..ac5c57de3 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -142,6 +142,7 @@ public final class Application { final ExecutorService workExecutor; //日志配置资源 + //@since 2.8.0 final Properties loggingProperties = new Properties(); //Source 原始的配置资源, 只会存在redkale.datasource(.|[) redkale.cachesource(.|[)开头的配置项 @@ -172,10 +173,18 @@ public final class Application { //配置信息,只读版Properties private final Environment environment; + //第三方服务配置资源 + //@since 2.8.0 + final Properties clusterProperties = new Properties(); + //第三方服务发现管理接口 //@since 2.1.0 private final ClusterAgent clusterAgent; + //MQ管理配置资源 + //@since 2.8.0 + final Properties messageProperties = new Properties(); + //MQ管理接口 //@since 2.1.0 private final MessageAgent[] messageAgents; @@ -779,6 +788,14 @@ public final class Application { } else { sourceProperties.put(key, val); } + } else if (key.toString().startsWith("redkale.mq.") || key.toString().startsWith("redkale.mq[")) { + if (key.toString().endsWith(".name")) { + logger.log(Level.WARNING, "skip illegal key " + key + " in mq config, key cannot endsWith '.name'"); + } else { + messageProperties.put(key, val); + } + } else if (key.toString().startsWith("redkale.cluster.")) { + clusterProperties.put(key, val); } }); } @@ -1109,6 +1126,18 @@ public final class Application { return null; } + private AnyValue findMQConfig(String mqName) { + AnyValue mqsNode = config.getAnyValue("mq"); + if (mqsNode != null) { + AnyValue confNode = mqsNode.getAnyValue(mqName); + if (confNode != null) { //必须要设置name属性 + ((DefaultAnyValue) confNode).setValue("name", mqName); + } + return confNode; + } + return null; + } + CacheSource loadCacheSource(final String sourceName, boolean autoMemory) { long st = System.currentTimeMillis(); CacheSource old = resourceFactory.find(sourceName, CacheSource.class); @@ -1828,10 +1857,25 @@ public final class Application { Set sourceRemovedKeys = new HashSet<>(); Properties sourceChangedProps = new Properties(); + + Set loggingRemovedKeys = new HashSet<>(); Properties loggingChangedProps = new Properties(); + + Set clusterRemovedKeys = new HashSet<>(); + Properties clusterChangedProps = new Properties(); + + Set messageRemovedKeys = new HashSet<>(); + Properties messageChangedProps = new Properties(); + for (ResourceEvent event : events) { if (namespace != null && namespace.startsWith("logging")) { - loggingChangedProps.put(event.name(), event.newValue()); + if (event.newValue() == null) { + if (loggingProperties.containsKey(event.name())) { + loggingRemovedKeys.add(event.name()); + } + } else { + loggingChangedProps.put(event.name(), event.newValue()); + } continue; } if (event.name().startsWith("redkale.datasource.") || event.name().startsWith("redkale.datasource[") @@ -1849,6 +1893,30 @@ public final class Application { } } } + } else if (event.name().startsWith("redkale.mq.") || event.name().startsWith("redkale.mq[")) { + if (event.name().endsWith(".name")) { + logger.log(Level.WARNING, "skip illegal key " + event.name() + " in mq config " + (namespace == null ? "" : namespace) + ", key cannot endsWith '.name'"); + } else { + if (!Objects.equals(event.newValue(), messageProperties.getProperty(event.name()))) { + if (event.newValue() == null) { + if (messageProperties.containsKey(event.name())) { + messageRemovedKeys.add(event.name()); + } + } else { + messageChangedProps.put(event.name(), event.newValue()); + } + } + } + } else if (event.name().startsWith("redkale.cluster.")) { + if (!Objects.equals(event.newValue(), clusterProperties.getProperty(event.name()))) { + if (event.newValue() == null) { + if (clusterProperties.containsKey(event.name())) { + clusterRemovedKeys.add(event.name()); + } + } else { + clusterChangedProps.put(event.name(), event.newValue()); + } + } } else if (event.name().startsWith("system.property.")) { String propName = event.name().substring("system.property.".length()); if (event.newValue() == null) { @@ -1894,10 +1962,11 @@ public final class Application { envRemovedKeys.forEach(k -> envProperties.remove(k)); resourceFactory.register(envRegisterProps, "", Environment.class); } + //日志配置项的变更 - if (!loggingChangedProps.isEmpty()) { + if (!loggingChangedProps.isEmpty() || !loggingRemovedKeys.isEmpty()) { //只是简单变更日志级别则直接操作,无需重新配置日志 - if (loggingChangedProps.size() == 1 && loggingChangedProps.containsKey(".level")) { + if (loggingRemovedKeys.isEmpty() && loggingChangedProps.size() == 1 && loggingChangedProps.containsKey(".level")) { try { Level logLevel = Level.parse(loggingChangedProps.getProperty(".level")); Logger.getGlobal().setLevel(logLevel); @@ -1910,10 +1979,12 @@ public final class Application { Properties newLogProps = new Properties(); newLogProps.putAll(this.loggingProperties); newLogProps.putAll(loggingChangedProps); + loggingRemovedKeys.forEach(k -> newLogProps.remove(k)); reconfigLogging(newLogProps); logger.log(Level.INFO, "reconfig logging finished "); } } + //数据源配置项的变更 if (!sourceChangedProps.isEmpty() || !sourceRemovedKeys.isEmpty()) { Set cacheSourceNames = new LinkedHashSet<>(); @@ -1935,7 +2006,7 @@ public final class Application { //更新缓存 for (String sourceName : cacheSourceNames) { CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), sourceName)); - if (source == null) return; //多余的数据源 + if (source == null) continue; //多余的数据源 final DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(sourceName, "cachesource"); Properties newProps = new Properties(); sourceProperties.forEach((k, v) -> { @@ -1976,8 +2047,8 @@ public final class Application { }); if (!changeEvents.isEmpty()) { DefaultAnyValue back = old.copy(); - old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("cachesource").getAnyValue(sourceName)); try { + old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("cachesource").getAnyValue(sourceName)); ((AbstractCacheSource) source).onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()])); } catch (RuntimeException e) { old.replace(back); //还原配置 @@ -1988,7 +2059,7 @@ public final class Application { //更新数据库 for (String sourceName : dataSourceNames) { DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), sourceName)); - if (source == null) return; //多余的数据源 + if (source == null) continue; //多余的数据源 DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(sourceName, "datasource"); Properties newProps = new Properties(); sourceProperties.forEach((k, v) -> { @@ -2029,8 +2100,8 @@ public final class Application { }); if (!changeEvents.isEmpty()) { DefaultAnyValue back = old.copy(); - old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("datasource").getAnyValue(sourceName)); try { + old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("datasource").getAnyValue(sourceName)); ((AbstractDataSource) source).onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()])); } catch (RuntimeException e) { old.replace(back); //还原配置 @@ -2041,6 +2112,119 @@ public final class Application { sourceRemovedKeys.forEach(k -> sourceProperties.remove(k)); sourceProperties.putAll(sourceChangedProps); } + + //MQ配置项的变更 + if (!messageChangedProps.isEmpty() || !messageRemovedKeys.isEmpty()) { + Set messageNames = new LinkedHashSet<>(); + List keys = new ArrayList<>(); + keys.addAll(messageRemovedKeys); + keys.addAll((Set) messageChangedProps.keySet()); + for (final String key : keys) { + if (key.startsWith("redkale.mq[")) { + messageNames.add(key.substring("redkale.mq[".length(), key.indexOf(']'))); + } else if (key.startsWith("redkale.mq.")) { + messageNames.add(key.substring("redkale.mq.".length(), key.indexOf('.', "redkale.mq.".length()))); + } + } + //更新MQ + for (String mqName : messageNames) { + MessageAgent agent = Utility.find(messageAgents, s -> Objects.equals(s.resourceName(), mqName)); + if (agent == null) continue; //多余的数据源 + final DefaultAnyValue old = (DefaultAnyValue) findMQConfig(mqName); + Properties newProps = new Properties(); + messageProperties.forEach((k, v) -> { + final String key = k.toString(); + String prefix = "redkale.mq[" + mqName + "]."; + int pos = key.indexOf(prefix); + if (pos < 0) { + prefix = "redkale.mq." + mqName + "."; + pos = key.indexOf(prefix); + } + if (pos < 0) return; //不是同一name数据源配置项 + newProps.put(k, v); + }); + List changeEvents = new ArrayList<>(); + messageChangedProps.forEach((k, v) -> { + final String key = k.toString(); + String prefix = "redkale.mq[" + mqName + "]."; + int pos = key.indexOf(prefix); + if (pos < 0) { + prefix = "redkale.mq." + mqName + "."; + pos = key.indexOf(prefix); + } + if (pos < 0) return; //不是同一name数据源配置项 + newProps.put(k, v); + changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), v, messageProperties.getProperty(key))); + }); + messageRemovedKeys.forEach(k -> { + final String key = k; + String prefix = "redkale.mq[" + mqName + "]."; + int pos = key.indexOf(prefix); + if (pos < 0) { + prefix = "redkale.mq." + mqName + "."; + pos = key.indexOf(prefix); + } + if (pos < 0) return; + newProps.remove(k); //不是同一name数据源配置项 + changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), null, messageProperties.getProperty(key))); + }); + if (!changeEvents.isEmpty()) { + DefaultAnyValue back = old.copy(); + try { + old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("mq").getAnyValue(mqName)); + agent.onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()])); + } catch (RuntimeException e) { + old.replace(back); //还原配置 + throw e; + } + } + } + messageRemovedKeys.forEach(k -> messageProperties.remove(k)); + messageProperties.putAll(messageChangedProps); + } + + //第三方服务注册配置项的变更 + if (!clusterChangedProps.isEmpty() || !clusterRemovedKeys.isEmpty()) { + if (this.clusterAgent != null) { + final DefaultAnyValue old = (DefaultAnyValue) config.getAnyValue("cluster"); + Properties newProps = new Properties(); + newProps.putAll(clusterProperties); + List changeEvents = new ArrayList<>(); + clusterChangedProps.forEach((k, v) -> { + final String key = k.toString(); + newProps.put(k, v); + changeEvents.add(ResourceEvent.create(key.substring("redkale.cluster.".length()), v, clusterProperties.getProperty(key))); + }); + clusterRemovedKeys.forEach(k -> { + final String key = k; + newProps.remove(k); + changeEvents.add(ResourceEvent.create(key.substring("redkale.cluster.".length()), null, clusterProperties.getProperty(key))); + }); + if (!changeEvents.isEmpty()) { + DefaultAnyValue back = old.copy(); + try { + old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("cluster")); + clusterAgent.onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()])); + } catch (RuntimeException e) { + old.replace(back); //还原配置 + throw e; + } + } + } else { + StringBuilder sb = new StringBuilder(); + clusterChangedProps.forEach((k, v) -> { + sb.append(ClusterAgent.class.getSimpleName()).append(" skip change '").append(k).append("'\r\n"); + }); + clusterRemovedKeys.forEach(k -> { + sb.append(ClusterAgent.class.getSimpleName()).append(" skip change '").append(k).append("'\r\n"); + }); + if (!sb.isEmpty()) { + logger.log(Level.INFO, sb.toString()); + } + } + clusterRemovedKeys.forEach(k -> clusterProperties.remove(k)); + clusterProperties.putAll(clusterChangedProps); + } } } diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index 2b73c067f..db6b79352 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -37,6 +37,8 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { protected ScheduledThreadPoolExecutor scheduler; + protected ScheduledFuture taskFuture; + //可能被HttpMessageClient用到的服务 key: serviceName protected final ConcurrentHashMap> httpAddressMap = new ConcurrentHashMap<>(); @@ -46,13 +48,37 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { @Override public void init(ResourceFactory factory, AnyValue config) { super.init(factory, config); - this.sourceName = getSourceName(); - this.ttls = config.getIntValue("ttls", 10); if (this.ttls < 5) this.ttls = 10; } + @Override + @ResourceListener + public void onResourceChange(ResourceEvent[] events) { + StringBuilder sb = new StringBuilder(); + int newTtls = this.ttls; + for (ResourceEvent event : events) { + if ("ttls".equals(event.name())) { + newTtls = Integer.parseInt(event.newValue().toString()); + if (newTtls < 5) { + sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") cannot change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); + } else { + sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); + } + } else { + sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") skip change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); + } + } + if (newTtls != this.ttls) { + this.ttls = newTtls; + start(); + } + if (!sb.isEmpty()) { + logger.log(Level.INFO, sb.toString()); + } + } + @Override public void setConfig(AnyValue config) { super.setConfig(config); @@ -88,22 +114,29 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { return t; }); - this.scheduler.scheduleAtFixedRate(() -> { - try { - checkApplicationHealth(); - checkHttpAddressHealth(); - loadMqtpAddressHealth(); - localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> { - checkLocalHealth(entry); - }); - remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> { - updateSncpTransport(entry); - }); - } catch (Exception e) { - logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e instanceof CompletionException ? ((CompletionException) e).getCause() : e); - } - }, Math.max(2000, ttls * 1000), Math.max(2000, ttls * 1000), TimeUnit.MILLISECONDS); } + if (this.taskFuture != null) { + this.taskFuture.cancel(true); + } + this.taskFuture = this.scheduler.scheduleAtFixedRate(newTask(), Math.max(2000, ttls * 1000), Math.max(2000, ttls * 1000), TimeUnit.MILLISECONDS); + } + + private Runnable newTask() { + return () -> { + try { + checkApplicationHealth(); + checkHttpAddressHealth(); + loadMqtpAddressHealth(); + localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> { + checkLocalHealth(entry); + }); + remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> { + updateSncpTransport(entry); + }); + } catch (Exception e) { + logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e instanceof CompletionException ? ((CompletionException) e).getCause() : e); + } + }; } protected void loadMqtpAddressHealth() { diff --git a/src/main/java/org/redkale/cluster/ClusterAgent.java b/src/main/java/org/redkale/cluster/ClusterAgent.java index 783337999..b6f2cc8e6 100644 --- a/src/main/java/org/redkale/cluster/ClusterAgent.java +++ b/src/main/java/org/redkale/cluster/ClusterAgent.java @@ -83,6 +83,9 @@ public abstract class ClusterAgent { } } + @ResourceListener + public abstract void onResourceChange(ResourceEvent[] events); + public void destroy(AnyValue config) { } diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 7d9af20f9..501636756 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -29,7 +29,7 @@ import org.redkale.util.*; * * @since 2.1.0 */ -public abstract class MessageAgent { +public abstract class MessageAgent implements Resourcable { protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); @@ -93,6 +93,9 @@ public abstract class MessageAgent { this.timeoutExecutor.setRemoveOnCancelPolicy(true); } + @ResourceListener + public abstract void onResourceChange(ResourceEvent[] events); + public CompletableFuture> start() { final LinkedHashMap map = new LinkedHashMap<>(); final List futures = new ArrayList<>(); @@ -143,6 +146,11 @@ public abstract class MessageAgent { return producers; } + @Override + public String resourceName() { + return name; + } + public MessageCoder getMessageCoder() { return this.messageCoder; }