diff --git a/src/main/java/org/redkale/asm/AsmMethodBoost.java b/src/main/java/org/redkale/asm/AsmMethodBoost.java index 0bc31dea9..d627f41d7 100644 --- a/src/main/java/org/redkale/asm/AsmMethodBoost.java +++ b/src/main/java/org/redkale/asm/AsmMethodBoost.java @@ -3,6 +3,17 @@ */ package org.redkale.asm; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.redkale.annotation.Nullable; import static org.redkale.asm.Opcodes.ACC_PRIVATE; import static org.redkale.asm.Opcodes.ACC_PROTECTED; import static org.redkale.asm.Opcodes.ACC_PUBLIC; @@ -17,25 +28,16 @@ import static org.redkale.asm.Opcodes.IRETURN; import static org.redkale.asm.Opcodes.LLOAD; import static org.redkale.asm.Opcodes.LRETURN; import static org.redkale.asm.Opcodes.RETURN; - -import java.io.InputStream; -import java.lang.annotation.Annotation; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import org.redkale.annotation.Nullable; import org.redkale.inject.ResourceFactory; import org.redkale.util.Utility; /** * 生产动态字节码的方法扩展器, 可以进行方法加强动作 * + *

详情见: https://redkale.org + * * @param 泛型 + * @author zhangjx * @since 2.8.0 */ public abstract class AsmMethodBoost { diff --git a/src/main/java/org/redkale/cache/spi/CacheAction.java b/src/main/java/org/redkale/cache/spi/CacheAction.java index 366807e91..8faa83a0f 100644 --- a/src/main/java/org/redkale/cache/spi/CacheAction.java +++ b/src/main/java/org/redkale/cache/spi/CacheAction.java @@ -114,7 +114,6 @@ public class CacheAction { @ClassDepends public T get(ThrowSupplier supplier, Object... args) { if (async) { - ThrowSupplier supplier0 = supplier; return (T) manager.bothGetSetAsync( hash, keyGenerator.generate(service, this, args), @@ -122,7 +121,7 @@ public class CacheAction { nullable, localExpire, remoteExpire, - supplier0); + (ThrowSupplier) supplier); } else { return manager.bothGetSet( hash, diff --git a/src/main/java/org/redkale/cache/spi/CacheAsmMethodBoost.java b/src/main/java/org/redkale/cache/spi/CacheAsmMethodBoost.java index 64fb95609..f57b576e6 100644 --- a/src/main/java/org/redkale/cache/spi/CacheAsmMethodBoost.java +++ b/src/main/java/org/redkale/cache/spi/CacheAsmMethodBoost.java @@ -33,7 +33,13 @@ import org.redkale.util.RedkaleException; import org.redkale.util.ThrowSupplier; import org.redkale.util.TypeToken; -/** @author zhangjx */ +/** + * 动态字节码的方法扩展器 + * + * @author zhangjx + * + * @since 2.8.0 + */ public class CacheAsmMethodBoost extends AsmMethodBoost { private static final java.lang.reflect.Type FUTURE_VOID = new TypeToken>() {}.getType(); @@ -55,12 +61,12 @@ public class CacheAsmMethodBoost extends AsmMethodBoost { @Override public String doMethod( - ClassLoader classLoader, - ClassWriter cw, - String newDynName, - String fieldPrefix, - List filterAnns, - Method method, + final ClassLoader classLoader, + final ClassWriter cw, + final String newDynName, + final String fieldPrefix, + final List filterAnns, + final Method method, final String newMethodName) { Map actions = this.actionMap; if (actions == null) { diff --git a/src/main/java/org/redkale/cache/spi/CacheEntry.java b/src/main/java/org/redkale/cache/spi/CacheEntry.java index f74735e6a..4080704b1 100644 --- a/src/main/java/org/redkale/cache/spi/CacheEntry.java +++ b/src/main/java/org/redkale/cache/spi/CacheEntry.java @@ -7,7 +7,15 @@ import java.util.concurrent.TimeUnit; import org.redkale.cache.Cached; import org.redkale.convert.json.JsonConvert; -/** @author zhangjx */ +/** + * 缓存信息的基本对象 + * + *

详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + * + */ public class CacheEntry { private String key; diff --git a/src/main/java/org/redkale/cache/spi/CacheManagerService.java b/src/main/java/org/redkale/cache/spi/CacheManagerService.java index e7cb7b571..d581d16af 100644 --- a/src/main/java/org/redkale/cache/spi/CacheManagerService.java +++ b/src/main/java/org/redkale/cache/spi/CacheManagerService.java @@ -3,6 +3,7 @@ */ package org.redkale.cache.spi; +import java.io.Serializable; import java.lang.reflect.Type; import java.time.Duration; import java.util.ArrayList; @@ -21,8 +22,10 @@ import org.redkale.annotation.Resource; import org.redkale.annotation.ResourceType; import org.redkale.boot.Application; import org.redkale.cache.CacheManager; +import org.redkale.convert.json.JsonConvert; import org.redkale.service.Local; import org.redkale.service.Service; +import org.redkale.source.CacheEventListener; import org.redkale.source.CacheMemorySource; import org.redkale.source.CacheSource; import org.redkale.util.AnyValue; @@ -34,7 +37,10 @@ import org.redkale.util.Utility; /** * 缓存管理器 * + *

详情见: https://redkale.org + * * @author zhangjx + * @since 2.8.0 */ @Local @Component @@ -42,9 +48,14 @@ import org.redkale.util.Utility; @ResourceType(CacheManager.class) public class CacheManagerService implements CacheManager, Service { + public static final String CACHE_CHANNEL_TOPIC = "cache-update-channel"; + // 是否开启缓存 protected boolean enabled = true; + // 是否开启本地缓存变更通知 + protected boolean channelable = true; + // 配置 protected AnyValue config; @@ -69,6 +80,8 @@ public class CacheManagerService implements CacheManager, Service { // 远程缓存Source protected CacheSource remoteSource; + protected CacheEventListener remoteListener; + protected CacheManagerService(@Nullable CacheSource remoteSource) { this.remoteSource = remoteSource; } @@ -98,11 +111,14 @@ public class CacheManagerService implements CacheManager, Service { this.localSource.init(conf); String remoteSourceName = conf.getValue("remote"); if (Utility.isNotBlank(remoteSourceName)) { + this.channelable = conf.getBoolValue("channelable", true); CacheSource source = application.loadCacheSource(remoteSourceName, false); if (source == null) { throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'"); } this.remoteSource = source; + this.remoteListener = new CacheRemoteListener(); + this.remoteSource.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); } } } @@ -112,6 +128,9 @@ public class CacheManagerService implements CacheManager, Service { if (this.enabled) { this.localSource.destroy(conf); } + if (this.remoteSource != null && this.remoteListener != null) { + this.remoteSource.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC); + } } public boolean isEnabled() { @@ -513,6 +532,9 @@ public class CacheManagerService implements CacheManager, Service { } if (remoteSource != null && remoteExpire != null) { setCache(remoteSource, hash, key, type, value, remoteExpire); + if (channelable) { + remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))); + } } } @@ -536,7 +558,14 @@ public class CacheManagerService implements CacheManager, Service { setCache(localSource, hash, key, type, value, localExpire); } if (remoteSource != null && remoteExpire != null) { - return setCacheAsync(remoteSource, hash, key, type, value, remoteExpire); + return setCacheAsync(remoteSource, hash, key, type, value, remoteExpire) + .thenCompose(r -> { + return channelable + ? remoteSource + .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))) + .thenApply(n -> r) + : CompletableFuture.completedFuture(null); + }); } else { return CompletableFuture.completedFuture(null); } @@ -555,10 +584,12 @@ public class CacheManagerService implements CacheManager, Service { String id = idFor(hash, key); long v = localSource.del(id); if (remoteSource != null) { - return remoteSource.del(id); - } else { - return v; + v = remoteSource.del(id); + if (channelable) { + remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id)); + } } + return v; } /** @@ -574,7 +605,13 @@ public class CacheManagerService implements CacheManager, Service { String id = idFor(hash, key); long v = localSource.del(id); // 内存操作,无需异步 if (remoteSource != null) { - return remoteSource.delAsync(id); + return remoteSource.delAsync(id).thenCompose(r -> { + return channelable + ? remoteSource + .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id)) + .thenApply(n -> r) + : CompletableFuture.completedFuture(v); + }); } else { return CompletableFuture.completedFuture(v); } @@ -945,4 +982,48 @@ public class CacheManagerService implements CacheManager, Service { } } } + + public class CacheRemoteListener implements CacheEventListener { + + @Override + public void onMessage(String topic, CacheEventMessage message) { + localSource.del(message.getKey()); + } + } + + public static class CacheEventMessage implements Serializable { + // key + protected String key; + + // 时间 + protected long time; + + public CacheEventMessage() {} + + public CacheEventMessage(String key) { + this.key = key; + this.time = System.currentTimeMillis(); + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public long getTime() { + return time; + } + + public void setTime(long time) { + this.time = time; + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } + } } diff --git a/src/main/java/org/redkale/cache/spi/CacheModuleEngine.java b/src/main/java/org/redkale/cache/spi/CacheModuleEngine.java index c2f8e57fd..df25ea4e2 100644 --- a/src/main/java/org/redkale/cache/spi/CacheModuleEngine.java +++ b/src/main/java/org/redkale/cache/spi/CacheModuleEngine.java @@ -24,7 +24,14 @@ import org.redkale.util.InstanceProvider; import org.redkale.util.RedkaleClassLoader; import org.redkale.util.RedkaleException; -/** @author zhangjx */ +/** + * 缓存管理器 + * + *

详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + */ public class CacheModuleEngine extends ModuleEngine { // 全局缓存管理器 @@ -124,7 +131,9 @@ public class CacheModuleEngine extends ModuleEngine { }); } - /** 进入Application.shutdown方法被调用 */ + /** + * 进入Application.shutdown方法被调用 + */ @Override public void onAppPreShutdown() { if (!application.isCompileMode() && this.cacheManager instanceof Service) {