From da6855193b76e369d2ce3054a4f35cdf3e54d087 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 11 Jun 2024 00:36:41 +0800 Subject: [PATCH] CachedEventMessage --- .../cached/spi/CachedEventMessage.java | 1 + .../cached/spi/CachedManagerService.java | 25 +++++++++++-------- .../cached/spi/CachedModuleEngine.java | 1 + 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/redkale/cached/spi/CachedEventMessage.java b/src/main/java/org/redkale/cached/spi/CachedEventMessage.java index 71685ae27..45237fc33 100644 --- a/src/main/java/org/redkale/cached/spi/CachedEventMessage.java +++ b/src/main/java/org/redkale/cached/spi/CachedEventMessage.java @@ -17,6 +17,7 @@ import org.redkale.convert.json.JsonConvert; * */ public class CachedEventMessage implements Serializable { + // key protected String key; diff --git a/src/main/java/org/redkale/cached/spi/CachedManagerService.java b/src/main/java/org/redkale/cached/spi/CachedManagerService.java index bfedc3420..4dc368a13 100644 --- a/src/main/java/org/redkale/cached/spi/CachedManagerService.java +++ b/src/main/java/org/redkale/cached/spi/CachedManagerService.java @@ -43,7 +43,7 @@ import org.redkale.util.TypeToken; @ResourceType(CachedManager.class) public class CachedManagerService implements CachedManager, Service { - public static final String CACHE_CHANNEL_TOPIC = "cache-update-channel"; + public static final String CACHED_CHANNEL_TOPIC_PREFIX = "cached-update-channel:"; protected final Logger logger = Logger.getLogger(getClass().getSimpleName()); @@ -89,6 +89,7 @@ public class CachedManagerService implements CachedManager, Service { protected CachedManagerService(@Nullable CacheSource remoteSource) { this.remoteSource = remoteSource; + this.name = ""; } // 一般用于独立组件 @@ -127,7 +128,7 @@ public class CachedManagerService implements CachedManager, Service { } if (remoteSource != null) { this.remoteListener = new CacheRemoteListener(); - this.remoteSource.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); + this.remoteSource.subscribe(CachedEventMessage.class, remoteListener, getChannelTopic()); } } } @@ -138,7 +139,7 @@ public class CachedManagerService implements CachedManager, Service { this.localSource.destroy(conf); } if (this.remoteSource != null && this.remoteListener != null) { - this.remoteSource.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC); + this.remoteSource.unsubscribe(remoteListener, getChannelTopic()); } } @@ -165,10 +166,10 @@ public class CachedManagerService implements CachedManager, Service { if (this.broadcastable != broadcastable && remote != null) { if (broadcastable) { this.remoteListener = new CacheRemoteListener(); - remote.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); + remote.subscribe(CachedEventMessage.class, remoteListener, getChannelTopic()); } else { if (this.remoteListener != null) { - remote.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC); + remote.unsubscribe(remoteListener, getChannelTopic()); this.remoteListener = null; } } @@ -176,6 +177,10 @@ public class CachedManagerService implements CachedManager, Service { } } + public String getChannelTopic() { + return CACHED_CHANNEL_TOPIC_PREFIX + resourceName(); + } + @Override public String toString() { return getClass().getSimpleName() + "_" + Objects.hash(this) + "{name = '" + name + "', schema = '" + schema @@ -554,7 +559,7 @@ public class CachedManagerService implements CachedManager, Service { setCache(remoteSource, key, type, value, remoteExpire); } if (remoteSource != null && broadcastable) { - remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(key))); + remoteSource.publish(getChannelTopic(), new CachedEventMessage(key)); } } @@ -582,7 +587,7 @@ public class CachedManagerService implements CachedManager, Service { } if (remoteSource != null && broadcastable) { future = future.thenCompose(r -> remoteSource - .publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(key))) + .publishAsync(getChannelTopic(), new CachedEventMessage(key)) .thenApply(n -> r)); } return future; @@ -602,7 +607,7 @@ public class CachedManagerService implements CachedManager, Service { if (remoteSource != null) { v = remoteSource.del(id); if (broadcastable) { - remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id)); + remoteSource.publish(getChannelTopic(), new CachedEventMessage(key)); } } return v; @@ -623,7 +628,7 @@ public class CachedManagerService implements CachedManager, Service { return remoteSource.delAsync(id).thenCompose(r -> { return broadcastable ? remoteSource - .publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id)) + .publishAsync(getChannelTopic(), new CachedEventMessage(key)) .thenApply(n -> r) : CompletableFuture.completedFuture(v); }); @@ -969,7 +974,7 @@ public class CachedManagerService implements CachedManager, Service { @Override public void onMessage(String topic, CachedEventMessage message) { - localSource.del(message.getKey()); + localSource.del(idFor(message.getKey())); } } } diff --git a/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java b/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java index 25ff10994..74f03480e 100644 --- a/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java +++ b/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java @@ -194,6 +194,7 @@ public class CachedModuleEngine extends ModuleEngine { ((Service) manager).destroy(v.config); } }); + cacheManagerMap.clear(); } }