CachedManager增加node属性

This commit is contained in:
redkale
2024-06-29 23:48:57 +08:00
parent ff76c698f4
commit 913a055e7e
3 changed files with 36 additions and 6 deletions

View File

@@ -41,6 +41,13 @@ public interface CachedManager extends Resourcable {
@Override
public String resourceName();
/**
* 唯一标识
*
* @return node
*/
public String getNode();
/**
* 缓存的schema, 不能含有':'、'#'、'@'字符
*

View File

@@ -18,6 +18,9 @@ import org.redkale.convert.json.JsonConvert;
*/
public class CachedEventMessage implements Serializable {
// CachedManager唯一标识
protected String node;
// key
protected String key;
@@ -26,11 +29,20 @@ public class CachedEventMessage implements Serializable {
public CachedEventMessage() {}
public CachedEventMessage(String key) {
public CachedEventMessage(String node, String key) {
this.node = node;
this.key = key;
this.time = System.currentTimeMillis();
}
public String getNode() {
return node;
}
public void setNode(String node) {
this.node = node;
}
public String getKey() {
return key;
}

View File

@@ -31,6 +31,7 @@ import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleException;
import org.redkale.util.ThrowSupplier;
import org.redkale.util.TypeToken;
import org.redkale.util.Utility;
/**
* 缓存管理器
@@ -52,6 +53,9 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
protected Level logLevel = Level.FINER;
// 唯一标识
protected final String node = Utility.uuid();
// 名称
protected String name;
@@ -167,6 +171,11 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
return name;
}
@Override
public String getNode() {
return node;
}
@Override
public String getSchema() {
return schema;
@@ -620,7 +629,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
setCache(remoteSource, key, type, value, remoteExpire);
}
if (remoteSource != null && broadcastable) {
remoteSource.publish(getChannelTopic(), new CachedEventMessage(key));
remoteSource.publish(getChannelTopic(), new CachedEventMessage(node, key));
}
}
@@ -648,7 +657,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
}
if (remoteSource != null && broadcastable) {
future = future.thenCompose(r -> remoteSource
.publishAsync(getChannelTopic(), new CachedEventMessage(key))
.publishAsync(getChannelTopic(), new CachedEventMessage(node, key))
.thenApply(n -> r));
}
return future;
@@ -668,7 +677,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
if (remoteSource != null) {
v = remoteSource.del(id);
if (broadcastable) {
remoteSource.publish(getChannelTopic(), new CachedEventMessage(key));
remoteSource.publish(getChannelTopic(), new CachedEventMessage(node, key));
}
}
return v;
@@ -689,7 +698,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
return remoteSource.delAsync(id).thenCompose(r -> {
return broadcastable
? remoteSource
.publishAsync(getChannelTopic(), new CachedEventMessage(key))
.publishAsync(getChannelTopic(), new CachedEventMessage(node, key))
.thenApply(n -> r)
: CompletableFuture.completedFuture(v);
});
@@ -1035,7 +1044,9 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
@Override
public void onMessage(String topic, CachedEventMessage message) {
localSource.del(idFor(message.getKey()));
if (!Objects.equals(getNode(), message.getNode())) {
localSource.del(idFor(message.getKey()));
}
}
}
}