From 913a055e7e7993ab158737e329e39fb19798bc2b Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 29 Jun 2024 23:48:57 +0800 Subject: [PATCH] =?UTF-8?q?CachedManager=E5=A2=9E=E5=8A=A0node=E5=B1=9E?= =?UTF-8?q?=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/cached/CachedManager.java | 7 +++++++ .../cached/spi/CachedEventMessage.java | 14 ++++++++++++- .../cached/spi/CachedManagerService.java | 21 ++++++++++++++----- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/redkale/cached/CachedManager.java b/src/main/java/org/redkale/cached/CachedManager.java index 1e3fcfdc1..ca2a29ef5 100644 --- a/src/main/java/org/redkale/cached/CachedManager.java +++ b/src/main/java/org/redkale/cached/CachedManager.java @@ -41,6 +41,13 @@ public interface CachedManager extends Resourcable { @Override public String resourceName(); + /** + * 唯一标识 + * + * @return node + */ + public String getNode(); + /** * 缓存的schema, 不能含有':'、'#'、'@'字符 * diff --git a/src/main/java/org/redkale/cached/spi/CachedEventMessage.java b/src/main/java/org/redkale/cached/spi/CachedEventMessage.java index 45237fc33..49b16e9e3 100644 --- a/src/main/java/org/redkale/cached/spi/CachedEventMessage.java +++ b/src/main/java/org/redkale/cached/spi/CachedEventMessage.java @@ -18,6 +18,9 @@ import org.redkale.convert.json.JsonConvert; */ public class CachedEventMessage implements Serializable { + // CachedManager唯一标识 + protected String node; + // key protected String key; @@ -26,11 +29,20 @@ public class CachedEventMessage implements Serializable { public CachedEventMessage() {} - public CachedEventMessage(String key) { + public CachedEventMessage(String node, String key) { + this.node = node; this.key = key; this.time = System.currentTimeMillis(); } + public String getNode() { + return node; + } + + public void setNode(String node) { + this.node = node; + } + public String getKey() { return key; } diff --git a/src/main/java/org/redkale/cached/spi/CachedManagerService.java b/src/main/java/org/redkale/cached/spi/CachedManagerService.java index af4b249a8..4126cc2a3 100644 --- a/src/main/java/org/redkale/cached/spi/CachedManagerService.java +++ b/src/main/java/org/redkale/cached/spi/CachedManagerService.java @@ -31,6 +31,7 @@ import org.redkale.util.AnyValue; import org.redkale.util.RedkaleException; import org.redkale.util.ThrowSupplier; import org.redkale.util.TypeToken; +import org.redkale.util.Utility; /** * 缓存管理器 @@ -52,6 +53,9 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se protected Level logLevel = Level.FINER; + // 唯一标识 + protected final String node = Utility.uuid(); + // 名称 protected String name; @@ -167,6 +171,11 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se return name; } + @Override + public String getNode() { + return node; + } + @Override public String getSchema() { return schema; @@ -620,7 +629,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se setCache(remoteSource, key, type, value, remoteExpire); } if (remoteSource != null && broadcastable) { - remoteSource.publish(getChannelTopic(), new CachedEventMessage(key)); + remoteSource.publish(getChannelTopic(), new CachedEventMessage(node, key)); } } @@ -648,7 +657,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se } if (remoteSource != null && broadcastable) { future = future.thenCompose(r -> remoteSource - .publishAsync(getChannelTopic(), new CachedEventMessage(key)) + .publishAsync(getChannelTopic(), new CachedEventMessage(node, key)) .thenApply(n -> r)); } return future; @@ -668,7 +677,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se if (remoteSource != null) { v = remoteSource.del(id); if (broadcastable) { - remoteSource.publish(getChannelTopic(), new CachedEventMessage(key)); + remoteSource.publish(getChannelTopic(), new CachedEventMessage(node, key)); } } return v; @@ -689,7 +698,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se return remoteSource.delAsync(id).thenCompose(r -> { return broadcastable ? remoteSource - .publishAsync(getChannelTopic(), new CachedEventMessage(key)) + .publishAsync(getChannelTopic(), new CachedEventMessage(node, key)) .thenApply(n -> r) : CompletableFuture.completedFuture(v); }); @@ -1035,7 +1044,9 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se @Override public void onMessage(String topic, CachedEventMessage message) { - localSource.del(idFor(message.getKey())); + if (!Objects.equals(getNode(), message.getNode())) { + localSource.del(idFor(message.getKey())); + } } } }