CachedEventMessage

This commit is contained in:
redkale
2024-06-11 00:36:41 +08:00
parent 7d2903b1a8
commit da6855193b
3 changed files with 17 additions and 10 deletions

View File

@@ -17,6 +17,7 @@ import org.redkale.convert.json.JsonConvert;
*
*/
public class CachedEventMessage implements Serializable {
// key
protected String key;

View File

@@ -43,7 +43,7 @@ import org.redkale.util.TypeToken;
@ResourceType(CachedManager.class)
public class CachedManagerService implements CachedManager, Service {
public static final String CACHE_CHANNEL_TOPIC = "cache-update-channel";
public static final String CACHED_CHANNEL_TOPIC_PREFIX = "cached-update-channel:";
protected final Logger logger = Logger.getLogger(getClass().getSimpleName());
@@ -89,6 +89,7 @@ public class CachedManagerService implements CachedManager, Service {
protected CachedManagerService(@Nullable CacheSource remoteSource) {
this.remoteSource = remoteSource;
this.name = "";
}
// 一般用于独立组件
@@ -127,7 +128,7 @@ public class CachedManagerService implements CachedManager, Service {
}
if (remoteSource != null) {
this.remoteListener = new CacheRemoteListener();
this.remoteSource.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
this.remoteSource.subscribe(CachedEventMessage.class, remoteListener, getChannelTopic());
}
}
}
@@ -138,7 +139,7 @@ public class CachedManagerService implements CachedManager, Service {
this.localSource.destroy(conf);
}
if (this.remoteSource != null && this.remoteListener != null) {
this.remoteSource.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC);
this.remoteSource.unsubscribe(remoteListener, getChannelTopic());
}
}
@@ -165,10 +166,10 @@ public class CachedManagerService implements CachedManager, Service {
if (this.broadcastable != broadcastable && remote != null) {
if (broadcastable) {
this.remoteListener = new CacheRemoteListener();
remote.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
remote.subscribe(CachedEventMessage.class, remoteListener, getChannelTopic());
} else {
if (this.remoteListener != null) {
remote.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC);
remote.unsubscribe(remoteListener, getChannelTopic());
this.remoteListener = null;
}
}
@@ -176,6 +177,10 @@ public class CachedManagerService implements CachedManager, Service {
}
}
public String getChannelTopic() {
return CACHED_CHANNEL_TOPIC_PREFIX + resourceName();
}
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hash(this) + "{name = '" + name + "', schema = '" + schema
@@ -554,7 +559,7 @@ public class CachedManagerService implements CachedManager, Service {
setCache(remoteSource, key, type, value, remoteExpire);
}
if (remoteSource != null && broadcastable) {
remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(key)));
remoteSource.publish(getChannelTopic(), new CachedEventMessage(key));
}
}
@@ -582,7 +587,7 @@ public class CachedManagerService implements CachedManager, Service {
}
if (remoteSource != null && broadcastable) {
future = future.thenCompose(r -> remoteSource
.publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(key)))
.publishAsync(getChannelTopic(), new CachedEventMessage(key))
.thenApply(n -> r));
}
return future;
@@ -602,7 +607,7 @@ public class CachedManagerService implements CachedManager, Service {
if (remoteSource != null) {
v = remoteSource.del(id);
if (broadcastable) {
remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id));
remoteSource.publish(getChannelTopic(), new CachedEventMessage(key));
}
}
return v;
@@ -623,7 +628,7 @@ public class CachedManagerService implements CachedManager, Service {
return remoteSource.delAsync(id).thenCompose(r -> {
return broadcastable
? remoteSource
.publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id))
.publishAsync(getChannelTopic(), new CachedEventMessage(key))
.thenApply(n -> r)
: CompletableFuture.completedFuture(v);
});
@@ -969,7 +974,7 @@ public class CachedManagerService implements CachedManager, Service {
@Override
public void onMessage(String topic, CachedEventMessage message) {
localSource.del(message.getKey());
localSource.del(idFor(message.getKey()));
}
}
}

View File

@@ -194,6 +194,7 @@ public class CachedModuleEngine extends ModuleEngine {
((Service) manager).destroy(v.config);
}
});
cacheManagerMap.clear();
}
}