diff --git a/src/main/java/org/redkale/cache/spi/CacheAction.java b/src/main/java/org/redkale/cache/spi/CacheAction.java index 8faa83a0f..d77321cc2 100644 --- a/src/main/java/org/redkale/cache/spi/CacheAction.java +++ b/src/main/java/org/redkale/cache/spi/CacheAction.java @@ -108,6 +108,7 @@ public class CacheAction { MultiHashKey dynKey = MultiHashKey.create(paramNames, key); this.keyGenerator = CacheKeyGenerator.create(dynKey); } + this.localExpire = createDuration(cached.getLocalExpire()); this.remoteExpire = createDuration(cached.getRemoteExpire()); } diff --git a/src/main/java/org/redkale/cache/spi/CacheManagerService.java b/src/main/java/org/redkale/cache/spi/CacheManagerService.java index d581d16af..3e6a89f33 100644 --- a/src/main/java/org/redkale/cache/spi/CacheManagerService.java +++ b/src/main/java/org/redkale/cache/spi/CacheManagerService.java @@ -54,7 +54,7 @@ public class CacheManagerService implements CacheManager, Service { protected boolean enabled = true; // 是否开启本地缓存变更通知 - protected boolean channelable = true; + protected boolean broadcastable = true; // 配置 protected AnyValue config; @@ -110,13 +110,15 @@ public class CacheManagerService implements CacheManager, Service { if (this.enabled) { this.localSource.init(conf); String remoteSourceName = conf.getValue("remote"); - if (Utility.isNotBlank(remoteSourceName)) { - this.channelable = conf.getBoolValue("channelable", true); + if (remoteSource == null && Utility.isNotBlank(remoteSourceName)) { + this.broadcastable = conf.getBoolValue("broadcastable", true); CacheSource source = application.loadCacheSource(remoteSourceName, false); if (source == null) { throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'"); } this.remoteSource = source; + } + if (remoteSource != null) { this.remoteListener = new CacheRemoteListener(); this.remoteSource.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); } @@ -142,6 +144,22 @@ public class CacheManagerService implements CacheManager, Service { return this; } + public void updateBroadcastable(boolean broadcastable) { + CacheSource remote = this.remoteSource; + if (this.broadcastable != broadcastable && remote != null) { + if (broadcastable) { + this.remoteListener = new CacheRemoteListener(); + remote.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); + } else { + if (this.remoteListener != null) { + remote.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC); + this.remoteListener = null; + } + } + this.broadcastable = broadcastable; + } + } + // -------------------------------------- 本地缓存 -------------------------------------- /** * 本地获取缓存数据, 过期返回null @@ -178,7 +196,7 @@ public class CacheManagerService implements CacheManager, Service { boolean nullable, Duration expire, ThrowSupplier supplier) { - return getSet(localSource::get, this::localSetCache, hash, key, type, nullable, expire, supplier); + return getSet(this::localGetCache, this::localSetCache, hash, key, type, nullable, expire, supplier); } /** @@ -202,7 +220,7 @@ public class CacheManagerService implements CacheManager, Service { Duration expire, ThrowSupplier> supplier) { return getSetAsync( - localSource::getAsync, this::localSetCacheAsync, hash, key, type, nullable, expire, supplier); + this::localGetCacheAsync, this::localSetCacheAsync, hash, key, type, nullable, expire, supplier); } /** @@ -285,7 +303,7 @@ public class CacheManagerService implements CacheManager, Service { boolean nullable, Duration expire, ThrowSupplier supplier) { - return getSet(remoteSource::get, this::remoteSetCache, hash, key, type, nullable, expire, supplier); + return getSet(this::remoteGetCache, this::remoteSetCache, hash, key, type, nullable, expire, supplier); } /** @@ -309,7 +327,7 @@ public class CacheManagerService implements CacheManager, Service { Duration expire, ThrowSupplier> supplier) { return getSetAsync( - remoteSource::getAsync, this::remoteSetCacheAsync, hash, key, type, nullable, expire, supplier); + this::remoteGetCacheAsync, this::remoteSetCacheAsync, hash, key, type, nullable, expire, supplier); } /** @@ -380,7 +398,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public T bothGet(final String hash, final String key, final Type type) { - return CacheValue.get(bothGetCache(hash, key, type)); + return CacheValue.get(bothGetCache(hash, key, (Duration) null, type)); } /** @@ -394,7 +412,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public CompletableFuture bothGetAsync(final String hash, final String key, final Type type) { - return bothGetCacheAsync(hash, key, type).thenApply(CacheValue::get); + return bothGetCacheAsync(hash, key, (Duration) null, type).thenApply(CacheValue::get); } /** @@ -530,11 +548,11 @@ public class CacheManagerService implements CacheManager, Service { if (localExpire != null) { setCache(localSource, hash, key, type, value, localExpire); } - if (remoteSource != null && remoteExpire != null) { + if (remoteExpire != null && remoteSource != null) { setCache(remoteSource, hash, key, type, value, remoteExpire); - if (channelable) { - remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))); - } + } + if (remoteSource != null && broadcastable) { + remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))); } } @@ -557,18 +575,16 @@ public class CacheManagerService implements CacheManager, Service { if (localExpire != null) { setCache(localSource, hash, key, type, value, localExpire); } + CompletableFuture future = CompletableFuture.completedFuture(null); if (remoteSource != null && remoteExpire != null) { - return setCacheAsync(remoteSource, hash, key, type, value, remoteExpire) - .thenCompose(r -> { - return channelable - ? remoteSource - .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))) - .thenApply(n -> r) - : CompletableFuture.completedFuture(null); - }); - } else { - return CompletableFuture.completedFuture(null); + future = setCacheAsync(remoteSource, hash, key, type, value, remoteExpire); } + if (remoteSource != null && broadcastable) { + future = future.thenCompose(r -> remoteSource + .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))) + .thenApply(n -> r)); + } + return future; } /** @@ -585,7 +601,7 @@ public class CacheManagerService implements CacheManager, Service { long v = localSource.del(id); if (remoteSource != null) { v = remoteSource.del(id); - if (channelable) { + if (broadcastable) { remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id)); } } @@ -606,7 +622,7 @@ public class CacheManagerService implements CacheManager, Service { long v = localSource.del(id); // 内存操作,无需异步 if (remoteSource != null) { return remoteSource.delAsync(id).thenCompose(r -> { - return channelable + return broadcastable ? remoteSource .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id)) .thenApply(n -> r) @@ -646,12 +662,12 @@ public class CacheManagerService implements CacheManager, Service { Objects.requireNonNull(supplier); final Type cacheType = loadCacheType(type); final String id = idFor(hash, key); - CacheValue cacheVal = getter.get(id, cacheType); + CacheValue cacheVal = getter.get(id, expire, cacheType); if (CacheValue.isValid(cacheVal)) { return cacheVal.getVal(); } Function func = k -> { - CacheValue oldCacheVal = getter.get(id, cacheType); + CacheValue oldCacheVal = getter.get(id, expire, cacheType); if (CacheValue.isValid(oldCacheVal)) { return oldCacheVal; } @@ -703,7 +719,7 @@ public class CacheManagerService implements CacheManager, Service { Objects.requireNonNull(supplier); final Type cacheType = loadCacheType(type); final String id = idFor(hash, key); - CompletableFuture> sourceFuture = getter.get(id, cacheType); + CompletableFuture> sourceFuture = getter.get(id, expire, cacheType); return sourceFuture.thenCompose(val -> { if (CacheValue.isValid(val)) { return CompletableFuture.completedFuture(val.getVal()); @@ -783,8 +799,8 @@ public class CacheManagerService implements CacheManager, Service { return setCacheAsync(source, idFor(hash, key), expire, loadCacheType(type, value), CacheValue.create(value)); } - protected CacheValue bothGetCache(final String hash, final String key, final Type type) { - return bothGetCache(idFor(hash, key), loadCacheType(type)); + protected CacheValue bothGetCache(String hash, String key, Duration expire, Type type) { + return bothGetCache(idFor(hash, key), expire, loadCacheType(type)); } /** @@ -797,23 +813,43 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ protected CompletableFuture> bothGetCacheAsync( - final String hash, final String key, final Type type) { - return bothGetCacheAsync(idFor(hash, key), loadCacheType(type)); + final String hash, final String key, Duration expire, final Type type) { + return bothGetCacheAsync(idFor(hash, key), expire, loadCacheType(type)); } - protected CacheValue bothGetCache(final String id, final Type cacheType) { + protected CacheValue bothGetCache(final String id, final Duration expire, final Type cacheType) { checkEnable(); CacheValue cacheVal = localSource.get(id, cacheType); if (CacheValue.isValid(cacheVal)) { return cacheVal; } if (remoteSource != null) { - return remoteSource.get(id, cacheType); + cacheVal = remoteSource.get(id, cacheType); + if (CacheValue.isValid(cacheVal) && expire != null) { + setCache(localSource, id, expire, cacheType, cacheVal); + } + return cacheVal; } else { return null; } } + protected CacheValue localGetCache(String id, Duration expire, Type cacheType) { + return localSource.get(id, cacheType); + } + + protected CompletableFuture> localGetCacheAsync(String id, Duration expire, Type cacheType) { + return localSource.getAsync(id, cacheType); + } + + protected CacheValue remoteGetCache(String id, Duration expire, Type cacheType) { + return remoteSource.get(id, cacheType); + } + + protected CompletableFuture> remoteGetCacheAsync(String id, Duration expire, Type cacheType) { + return remoteSource.getAsync(id, cacheType); + } + /** * 远程异步获取缓存数据, 过期返回null * @@ -822,14 +858,20 @@ public class CacheManagerService implements CacheManager, Service { * @param cacheType 数据类型 * @return 数据值 */ - protected CompletableFuture> bothGetCacheAsync(final String id, final Type cacheType) { + protected CompletableFuture> bothGetCacheAsync(String id, Duration expire, Type cacheType) { checkEnable(); CacheValue val = localSource.get(id, cacheType); // 内存操作,无需异步 if (CacheValue.isValid(val)) { return CompletableFuture.completedFuture(val); } if (remoteSource != null) { - return remoteSource.getAsync(id, cacheType); + CompletableFuture> future = remoteSource.getAsync(id, cacheType); + return future.thenApply(v -> { + if (CacheValue.isValid(v) && expire != null) { + setCache(localSource, id, expire, cacheType, v); + } + return v; + }); } else { return CompletableFuture.completedFuture(null); } @@ -905,7 +947,7 @@ public class CacheManagerService implements CacheManager, Service { protected static interface GetterFunc { - public R get(String id, Type cacheType); + public R get(String id, Duration expire, Type cacheType); } protected static interface SetterSyncFunc { diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index 5115ea4ec..9b95cabda 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -5,19 +5,17 @@ */ package org.redkale.net.sncp; +import java.lang.annotation.*; import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.RetentionPolicy.RUNTIME; -import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; -import static org.redkale.asm.Opcodes.*; -import static org.redkale.util.Utility.isEmpty; - -import java.lang.annotation.*; import java.lang.reflect.*; import java.nio.channels.CompletionHandler; import java.util.*; import org.redkale.annotation.*; import org.redkale.asm.*; +import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; +import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; import org.redkale.convert.Convert; import org.redkale.convert.bson.BsonConvert; @@ -33,6 +31,7 @@ import org.redkale.util.RedkaleClassLoader; import org.redkale.util.TypeToken; import org.redkale.util.Uint128; import org.redkale.util.Utility; +import static org.redkale.util.Utility.isEmpty; /** * Service Node Communicate Protocol 生成Service的本地模式或远程模式Service-Class的工具类 @@ -531,13 +530,15 @@ public abstract class Sncp { } newDynName += "_" + (normal ? name : hash(name)); } - try { - Class clz = RedkaleClassLoader.findDynClass(newDynName.replace('/', '.')); - return (Class) (clz == null ? loader.loadClass(newDynName.replace('/', '.')) : clz); - } catch (ClassNotFoundException e) { - // do nothing - } catch (Throwable t) { - t.printStackTrace(); + if (methodBoost == null) { // 加强动态时不能重复加载 + try { + Class clz = RedkaleClassLoader.findDynClass(newDynName.replace('/', '.')); + return (Class) (clz == null ? loader.loadClass(newDynName.replace('/', '.')) : clz); + } catch (ClassNotFoundException e) { + // do nothing + } catch (Throwable t) { + t.printStackTrace(); + } } // ------------------------------------------------------------------------------ ClassWriter cw = new ClassWriter(COMPUTE_FRAMES); @@ -598,11 +599,22 @@ public abstract class Sncp { } cw.visitEnd(); byte[] bytes = cw.toByteArray(); - Class newClazz = new ClassLoader(loader) { - public final Class loadClass(String name, byte[] b) { - return defineClass(name, b, 0, b.length); + Class newClazz = null; + if (methodBoost != null) { + try { + Class clz = RedkaleClassLoader.findDynClass(newDynName.replace('/', '.')); + newClazz = (clz == null ? loader.loadClass(newDynName.replace('/', '.')) : clz); + } catch (Throwable t) { + // do nothing } - }.loadClass(newDynName.replace('/', '.'), bytes); + } + if (newClazz == null) { + newClazz = new ClassLoader(loader) { + public final Class loadClass(String name, byte[] b) { + return defineClass(name, b, 0, b.length); + } + }.loadClass(newDynName.replace('/', '.'), bytes); + } RedkaleClassLoader.putDynClass(newDynName.replace('/', '.'), bytes, newClazz); RedkaleClassLoader.putReflectionPublicClasses(newDynName.replace('/', '.')); RedkaleClassLoader.putReflectionDeclaredConstructors(newClazz, newDynName.replace('/', '.')); diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index fd96f6126..0d7c1aaed 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -119,7 +119,8 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public String toString() { - return getClass().getSimpleName() + "{type=memory, name='" + resourceName() + "'}"; + return getClass().getSimpleName() + "{type=memory, name='" + resourceName() + "', hash=" + + Objects.hashCode(this) + "}"; } @Override diff --git a/src/test/java/org/redkale/test/cache/CacheInstance.java b/src/test/java/org/redkale/test/cache/CacheInstance.java index dec2d2056..22934540d 100644 --- a/src/test/java/org/redkale/test/cache/CacheInstance.java +++ b/src/test/java/org/redkale/test/cache/CacheInstance.java @@ -40,11 +40,11 @@ public class CacheInstance implements Service { return "haha"; } - public void updateName() { - cacheManager.bothSet("name", String.class, "gege", Duration.ofMillis(600), Duration.ofMillis(600)); + public void updateName(String val) { + cacheManager.bothSet("name_2", String.class, val, Duration.ofSeconds(31), Duration.ofSeconds(60)); } - @Cached(key = "name", localExpire = "30", remoteExpire = "60") + @Cached(key = "name_2", localExpire = "31", remoteExpire = "60") public String getName2() throws RedkaleException { return "haha"; } @@ -80,6 +80,10 @@ public class CacheInstance implements Service { return CompletableFuture.completedFuture(null); } + public CacheManager getCacheManager() { + return cacheManager; + } + public static class ParamBean { private String name; diff --git a/src/test/java/org/redkale/test/cache/CacheInstanceTest.java b/src/test/java/org/redkale/test/cache/CacheInstanceTest.java index 142d18754..564907610 100644 --- a/src/test/java/org/redkale/test/cache/CacheInstanceTest.java +++ b/src/test/java/org/redkale/test/cache/CacheInstanceTest.java @@ -60,6 +60,7 @@ public class CacheInstanceTest { public void run1() throws Exception { Class serviceClass = CacheInstance.class; CacheAsmMethodBoost boost = new CacheAsmMethodBoost(false, serviceClass); + CacheAsmMethodBoost boost2 = new CacheAsmMethodBoost(false, serviceClass); SncpRpcGroups grous = new SncpRpcGroups(); AsyncGroup iGroup = AsyncGroup.create("", Utility.newScheduledExecutor(1), 0, 0); SncpClient client = new SncpClient( @@ -68,12 +69,24 @@ public class CacheInstanceTest { null, "", serviceClass, boost, resourceFactory, grous, client, null, null, null); resourceFactory.inject(instance); CacheInstance instance2 = Sncp.createLocalService( - null, "", serviceClass, boost, resourceFactory2, grous, client, null, null, null); + null, "", serviceClass, boost2, resourceFactory2, grous, client, null, null, null); resourceFactory2.inject(instance2); System.out.println(instance.getName2()); + System.out.println(instance.getClass()); Assertions.assertEquals("haha", instance.getName2()); Assertions.assertEquals("haha", instance2.getName2()); - instance.updateName(); + System.out.println("准备设置 updateName"); + System.out.println("instance1.manager = " + instance.getCacheManager()); + System.out.println("instance2.manager = " + instance2.getCacheManager()); + manager.updateBroadcastable(false); + instance.updateName("gege"); + Assertions.assertEquals("gege", instance.getName2()); + Assertions.assertEquals("haha", instance2.getName2()); + manager.updateBroadcastable(true); + System.out.println("准备设置 updateName"); + instance.updateName("gege"); + System.out.println("设置结束 updateName"); + Utility.sleep(10); Assertions.assertEquals("gege", instance.getName2()); Assertions.assertEquals("gege", instance2.getName2()); }