CacheManager优化

This commit is contained in:
redkale
2024-06-07 14:51:01 +08:00
parent 90097909ce
commit 1a4e2986ad
6 changed files with 132 additions and 59 deletions

View File

@@ -108,6 +108,7 @@ public class CacheAction {
MultiHashKey dynKey = MultiHashKey.create(paramNames, key); MultiHashKey dynKey = MultiHashKey.create(paramNames, key);
this.keyGenerator = CacheKeyGenerator.create(dynKey); this.keyGenerator = CacheKeyGenerator.create(dynKey);
} }
this.localExpire = createDuration(cached.getLocalExpire());
this.remoteExpire = createDuration(cached.getRemoteExpire()); this.remoteExpire = createDuration(cached.getRemoteExpire());
} }

View File

@@ -54,7 +54,7 @@ public class CacheManagerService implements CacheManager, Service {
protected boolean enabled = true; protected boolean enabled = true;
// 是否开启本地缓存变更通知 // 是否开启本地缓存变更通知
protected boolean channelable = true; protected boolean broadcastable = true;
// 配置 // 配置
protected AnyValue config; protected AnyValue config;
@@ -110,13 +110,15 @@ public class CacheManagerService implements CacheManager, Service {
if (this.enabled) { if (this.enabled) {
this.localSource.init(conf); this.localSource.init(conf);
String remoteSourceName = conf.getValue("remote"); String remoteSourceName = conf.getValue("remote");
if (Utility.isNotBlank(remoteSourceName)) { if (remoteSource == null && Utility.isNotBlank(remoteSourceName)) {
this.channelable = conf.getBoolValue("channelable", true); this.broadcastable = conf.getBoolValue("broadcastable", true);
CacheSource source = application.loadCacheSource(remoteSourceName, false); CacheSource source = application.loadCacheSource(remoteSourceName, false);
if (source == null) { if (source == null) {
throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'"); throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'");
} }
this.remoteSource = source; this.remoteSource = source;
}
if (remoteSource != null) {
this.remoteListener = new CacheRemoteListener(); this.remoteListener = new CacheRemoteListener();
this.remoteSource.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); this.remoteSource.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
} }
@@ -142,6 +144,22 @@ public class CacheManagerService implements CacheManager, Service {
return this; return this;
} }
public void updateBroadcastable(boolean broadcastable) {
CacheSource remote = this.remoteSource;
if (this.broadcastable != broadcastable && remote != null) {
if (broadcastable) {
this.remoteListener = new CacheRemoteListener();
remote.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
} else {
if (this.remoteListener != null) {
remote.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC);
this.remoteListener = null;
}
}
this.broadcastable = broadcastable;
}
}
// -------------------------------------- 本地缓存 -------------------------------------- // -------------------------------------- 本地缓存 --------------------------------------
/** /**
* 本地获取缓存数据, 过期返回null * 本地获取缓存数据, 过期返回null
@@ -178,7 +196,7 @@ public class CacheManagerService implements CacheManager, Service {
boolean nullable, boolean nullable,
Duration expire, Duration expire,
ThrowSupplier<T> supplier) { ThrowSupplier<T> supplier) {
return getSet(localSource::get, this::localSetCache, hash, key, type, nullable, expire, supplier); return getSet(this::localGetCache, this::localSetCache, hash, key, type, nullable, expire, supplier);
} }
/** /**
@@ -202,7 +220,7 @@ public class CacheManagerService implements CacheManager, Service {
Duration expire, Duration expire,
ThrowSupplier<CompletableFuture<T>> supplier) { ThrowSupplier<CompletableFuture<T>> supplier) {
return getSetAsync( return getSetAsync(
localSource::getAsync, this::localSetCacheAsync, hash, key, type, nullable, expire, supplier); this::localGetCacheAsync, this::localSetCacheAsync, hash, key, type, nullable, expire, supplier);
} }
/** /**
@@ -285,7 +303,7 @@ public class CacheManagerService implements CacheManager, Service {
boolean nullable, boolean nullable,
Duration expire, Duration expire,
ThrowSupplier<T> supplier) { ThrowSupplier<T> supplier) {
return getSet(remoteSource::get, this::remoteSetCache, hash, key, type, nullable, expire, supplier); return getSet(this::remoteGetCache, this::remoteSetCache, hash, key, type, nullable, expire, supplier);
} }
/** /**
@@ -309,7 +327,7 @@ public class CacheManagerService implements CacheManager, Service {
Duration expire, Duration expire,
ThrowSupplier<CompletableFuture<T>> supplier) { ThrowSupplier<CompletableFuture<T>> supplier) {
return getSetAsync( return getSetAsync(
remoteSource::getAsync, this::remoteSetCacheAsync, hash, key, type, nullable, expire, supplier); this::remoteGetCacheAsync, this::remoteSetCacheAsync, hash, key, type, nullable, expire, supplier);
} }
/** /**
@@ -380,7 +398,7 @@ public class CacheManagerService implements CacheManager, Service {
*/ */
@Override @Override
public <T> T bothGet(final String hash, final String key, final Type type) { public <T> T bothGet(final String hash, final String key, final Type type) {
return CacheValue.get(bothGetCache(hash, key, type)); return CacheValue.get(bothGetCache(hash, key, (Duration) null, type));
} }
/** /**
@@ -394,7 +412,7 @@ public class CacheManagerService implements CacheManager, Service {
*/ */
@Override @Override
public <T> CompletableFuture<T> bothGetAsync(final String hash, final String key, final Type type) { public <T> CompletableFuture<T> bothGetAsync(final String hash, final String key, final Type type) {
return bothGetCacheAsync(hash, key, type).thenApply(CacheValue::get); return bothGetCacheAsync(hash, key, (Duration) null, type).thenApply(CacheValue::get);
} }
/** /**
@@ -530,11 +548,11 @@ public class CacheManagerService implements CacheManager, Service {
if (localExpire != null) { if (localExpire != null) {
setCache(localSource, hash, key, type, value, localExpire); setCache(localSource, hash, key, type, value, localExpire);
} }
if (remoteSource != null && remoteExpire != null) { if (remoteExpire != null && remoteSource != null) {
setCache(remoteSource, hash, key, type, value, remoteExpire); setCache(remoteSource, hash, key, type, value, remoteExpire);
if (channelable) { }
remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))); if (remoteSource != null && broadcastable) {
} remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key)));
} }
} }
@@ -557,18 +575,16 @@ public class CacheManagerService implements CacheManager, Service {
if (localExpire != null) { if (localExpire != null) {
setCache(localSource, hash, key, type, value, localExpire); setCache(localSource, hash, key, type, value, localExpire);
} }
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (remoteSource != null && remoteExpire != null) { if (remoteSource != null && remoteExpire != null) {
return setCacheAsync(remoteSource, hash, key, type, value, remoteExpire) future = setCacheAsync(remoteSource, hash, key, type, value, remoteExpire);
.thenCompose(r -> {
return channelable
? remoteSource
.publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key)))
.thenApply(n -> r)
: CompletableFuture.completedFuture(null);
});
} else {
return CompletableFuture.completedFuture(null);
} }
if (remoteSource != null && broadcastable) {
future = future.thenCompose(r -> remoteSource
.publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key)))
.thenApply(n -> r));
}
return future;
} }
/** /**
@@ -585,7 +601,7 @@ public class CacheManagerService implements CacheManager, Service {
long v = localSource.del(id); long v = localSource.del(id);
if (remoteSource != null) { if (remoteSource != null) {
v = remoteSource.del(id); v = remoteSource.del(id);
if (channelable) { if (broadcastable) {
remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id)); remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id));
} }
} }
@@ -606,7 +622,7 @@ public class CacheManagerService implements CacheManager, Service {
long v = localSource.del(id); // 内存操作,无需异步 long v = localSource.del(id); // 内存操作,无需异步
if (remoteSource != null) { if (remoteSource != null) {
return remoteSource.delAsync(id).thenCompose(r -> { return remoteSource.delAsync(id).thenCompose(r -> {
return channelable return broadcastable
? remoteSource ? remoteSource
.publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id)) .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id))
.thenApply(n -> r) .thenApply(n -> r)
@@ -646,12 +662,12 @@ public class CacheManagerService implements CacheManager, Service {
Objects.requireNonNull(supplier); Objects.requireNonNull(supplier);
final Type cacheType = loadCacheType(type); final Type cacheType = loadCacheType(type);
final String id = idFor(hash, key); final String id = idFor(hash, key);
CacheValue<T> cacheVal = getter.get(id, cacheType); CacheValue<T> cacheVal = getter.get(id, expire, cacheType);
if (CacheValue.isValid(cacheVal)) { if (CacheValue.isValid(cacheVal)) {
return cacheVal.getVal(); return cacheVal.getVal();
} }
Function<String, CacheValue> func = k -> { Function<String, CacheValue> func = k -> {
CacheValue<T> oldCacheVal = getter.get(id, cacheType); CacheValue<T> oldCacheVal = getter.get(id, expire, cacheType);
if (CacheValue.isValid(oldCacheVal)) { if (CacheValue.isValid(oldCacheVal)) {
return oldCacheVal; return oldCacheVal;
} }
@@ -703,7 +719,7 @@ public class CacheManagerService implements CacheManager, Service {
Objects.requireNonNull(supplier); Objects.requireNonNull(supplier);
final Type cacheType = loadCacheType(type); final Type cacheType = loadCacheType(type);
final String id = idFor(hash, key); final String id = idFor(hash, key);
CompletableFuture<CacheValue<T>> sourceFuture = getter.get(id, cacheType); CompletableFuture<CacheValue<T>> sourceFuture = getter.get(id, expire, cacheType);
return sourceFuture.thenCompose(val -> { return sourceFuture.thenCompose(val -> {
if (CacheValue.isValid(val)) { if (CacheValue.isValid(val)) {
return CompletableFuture.completedFuture(val.getVal()); return CompletableFuture.completedFuture(val.getVal());
@@ -783,8 +799,8 @@ public class CacheManagerService implements CacheManager, Service {
return setCacheAsync(source, idFor(hash, key), expire, loadCacheType(type, value), CacheValue.create(value)); return setCacheAsync(source, idFor(hash, key), expire, loadCacheType(type, value), CacheValue.create(value));
} }
protected <T> CacheValue<T> bothGetCache(final String hash, final String key, final Type type) { protected <T> CacheValue<T> bothGetCache(String hash, String key, Duration expire, Type type) {
return bothGetCache(idFor(hash, key), loadCacheType(type)); return bothGetCache(idFor(hash, key), expire, loadCacheType(type));
} }
/** /**
@@ -797,23 +813,43 @@ public class CacheManagerService implements CacheManager, Service {
* @return 数据值 * @return 数据值
*/ */
protected <T> CompletableFuture<CacheValue<T>> bothGetCacheAsync( protected <T> CompletableFuture<CacheValue<T>> bothGetCacheAsync(
final String hash, final String key, final Type type) { final String hash, final String key, Duration expire, final Type type) {
return bothGetCacheAsync(idFor(hash, key), loadCacheType(type)); return bothGetCacheAsync(idFor(hash, key), expire, loadCacheType(type));
} }
protected <T> CacheValue<T> bothGetCache(final String id, final Type cacheType) { protected <T> CacheValue<T> bothGetCache(final String id, final Duration expire, final Type cacheType) {
checkEnable(); checkEnable();
CacheValue<T> cacheVal = localSource.get(id, cacheType); CacheValue<T> cacheVal = localSource.get(id, cacheType);
if (CacheValue.isValid(cacheVal)) { if (CacheValue.isValid(cacheVal)) {
return cacheVal; return cacheVal;
} }
if (remoteSource != null) { if (remoteSource != null) {
return remoteSource.get(id, cacheType); cacheVal = remoteSource.get(id, cacheType);
if (CacheValue.isValid(cacheVal) && expire != null) {
setCache(localSource, id, expire, cacheType, cacheVal);
}
return cacheVal;
} else { } else {
return null; return null;
} }
} }
protected <T> CacheValue<T> localGetCache(String id, Duration expire, Type cacheType) {
return localSource.get(id, cacheType);
}
protected <T> CompletableFuture<CacheValue<T>> localGetCacheAsync(String id, Duration expire, Type cacheType) {
return localSource.getAsync(id, cacheType);
}
protected <T> CacheValue<T> remoteGetCache(String id, Duration expire, Type cacheType) {
return remoteSource.get(id, cacheType);
}
protected <T> CompletableFuture<CacheValue<T>> remoteGetCacheAsync(String id, Duration expire, Type cacheType) {
return remoteSource.getAsync(id, cacheType);
}
/** /**
* 远程异步获取缓存数据, 过期返回null * 远程异步获取缓存数据, 过期返回null
* *
@@ -822,14 +858,20 @@ public class CacheManagerService implements CacheManager, Service {
* @param cacheType 数据类型 * @param cacheType 数据类型
* @return 数据值 * @return 数据值
*/ */
protected <T> CompletableFuture<CacheValue<T>> bothGetCacheAsync(final String id, final Type cacheType) { protected <T> CompletableFuture<CacheValue<T>> bothGetCacheAsync(String id, Duration expire, Type cacheType) {
checkEnable(); checkEnable();
CacheValue<T> val = localSource.get(id, cacheType); // 内存操作,无需异步 CacheValue<T> val = localSource.get(id, cacheType); // 内存操作,无需异步
if (CacheValue.isValid(val)) { if (CacheValue.isValid(val)) {
return CompletableFuture.completedFuture(val); return CompletableFuture.completedFuture(val);
} }
if (remoteSource != null) { if (remoteSource != null) {
return remoteSource.getAsync(id, cacheType); CompletableFuture<CacheValue<T>> future = remoteSource.getAsync(id, cacheType);
return future.thenApply(v -> {
if (CacheValue.isValid(v) && expire != null) {
setCache(localSource, id, expire, cacheType, v);
}
return v;
});
} else { } else {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
@@ -905,7 +947,7 @@ public class CacheManagerService implements CacheManager, Service {
protected static interface GetterFunc<R> { protected static interface GetterFunc<R> {
public R get(String id, Type cacheType); public R get(String id, Duration expire, Type cacheType);
} }
protected static interface SetterSyncFunc { protected static interface SetterSyncFunc {

View File

@@ -5,19 +5,17 @@
*/ */
package org.redkale.net.sncp; package org.redkale.net.sncp;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import static org.redkale.asm.Opcodes.*;
import static org.redkale.util.Utility.isEmpty;
import java.lang.annotation.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
import org.redkale.annotation.*; import org.redkale.annotation.*;
import org.redkale.asm.*; import org.redkale.asm.*;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type; import org.redkale.asm.Type;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.bson.BsonConvert;
@@ -33,6 +31,7 @@ import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.TypeToken; import org.redkale.util.TypeToken;
import org.redkale.util.Uint128; import org.redkale.util.Uint128;
import org.redkale.util.Utility; import org.redkale.util.Utility;
import static org.redkale.util.Utility.isEmpty;
/** /**
* Service Node Communicate Protocol 生成Service的本地模式或远程模式Service-Class的工具类 * Service Node Communicate Protocol 生成Service的本地模式或远程模式Service-Class的工具类
@@ -531,13 +530,15 @@ public abstract class Sncp {
} }
newDynName += "_" + (normal ? name : hash(name)); newDynName += "_" + (normal ? name : hash(name));
} }
try { if (methodBoost == null) { // 加强动态时不能重复加载
Class clz = RedkaleClassLoader.findDynClass(newDynName.replace('/', '.')); try {
return (Class<T>) (clz == null ? loader.loadClass(newDynName.replace('/', '.')) : clz); Class clz = RedkaleClassLoader.findDynClass(newDynName.replace('/', '.'));
} catch (ClassNotFoundException e) { return (Class<T>) (clz == null ? loader.loadClass(newDynName.replace('/', '.')) : clz);
// do nothing } catch (ClassNotFoundException e) {
} catch (Throwable t) { // do nothing
t.printStackTrace(); } catch (Throwable t) {
t.printStackTrace();
}
} }
// ------------------------------------------------------------------------------ // ------------------------------------------------------------------------------
ClassWriter cw = new ClassWriter(COMPUTE_FRAMES); ClassWriter cw = new ClassWriter(COMPUTE_FRAMES);
@@ -598,11 +599,22 @@ public abstract class Sncp {
} }
cw.visitEnd(); cw.visitEnd();
byte[] bytes = cw.toByteArray(); byte[] bytes = cw.toByteArray();
Class<?> newClazz = new ClassLoader(loader) { Class<?> newClazz = null;
public final Class<?> loadClass(String name, byte[] b) { if (methodBoost != null) {
return defineClass(name, b, 0, b.length); try {
Class clz = RedkaleClassLoader.findDynClass(newDynName.replace('/', '.'));
newClazz = (clz == null ? loader.loadClass(newDynName.replace('/', '.')) : clz);
} catch (Throwable t) {
// do nothing
} }
}.loadClass(newDynName.replace('/', '.'), bytes); }
if (newClazz == null) {
newClazz = new ClassLoader(loader) {
public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length);
}
}.loadClass(newDynName.replace('/', '.'), bytes);
}
RedkaleClassLoader.putDynClass(newDynName.replace('/', '.'), bytes, newClazz); RedkaleClassLoader.putDynClass(newDynName.replace('/', '.'), bytes, newClazz);
RedkaleClassLoader.putReflectionPublicClasses(newDynName.replace('/', '.')); RedkaleClassLoader.putReflectionPublicClasses(newDynName.replace('/', '.'));
RedkaleClassLoader.putReflectionDeclaredConstructors(newClazz, newDynName.replace('/', '.')); RedkaleClassLoader.putReflectionDeclaredConstructors(newClazz, newDynName.replace('/', '.'));

View File

@@ -119,7 +119,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "{type=memory, name='" + resourceName() + "'}"; return getClass().getSimpleName() + "{type=memory, name='" + resourceName() + "', hash="
+ Objects.hashCode(this) + "}";
} }
@Override @Override

View File

@@ -40,11 +40,11 @@ public class CacheInstance implements Service {
return "haha"; return "haha";
} }
public void updateName() { public void updateName(String val) {
cacheManager.bothSet("name", String.class, "gege", Duration.ofMillis(600), Duration.ofMillis(600)); cacheManager.bothSet("name_2", String.class, val, Duration.ofSeconds(31), Duration.ofSeconds(60));
} }
@Cached(key = "name", localExpire = "30", remoteExpire = "60") @Cached(key = "name_2", localExpire = "31", remoteExpire = "60")
public String getName2() throws RedkaleException { public String getName2() throws RedkaleException {
return "haha"; return "haha";
} }
@@ -80,6 +80,10 @@ public class CacheInstance implements Service {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
public CacheManager getCacheManager() {
return cacheManager;
}
public static class ParamBean { public static class ParamBean {
private String name; private String name;

View File

@@ -60,6 +60,7 @@ public class CacheInstanceTest {
public void run1() throws Exception { public void run1() throws Exception {
Class<CacheInstance> serviceClass = CacheInstance.class; Class<CacheInstance> serviceClass = CacheInstance.class;
CacheAsmMethodBoost boost = new CacheAsmMethodBoost(false, serviceClass); CacheAsmMethodBoost boost = new CacheAsmMethodBoost(false, serviceClass);
CacheAsmMethodBoost boost2 = new CacheAsmMethodBoost(false, serviceClass);
SncpRpcGroups grous = new SncpRpcGroups(); SncpRpcGroups grous = new SncpRpcGroups();
AsyncGroup iGroup = AsyncGroup.create("", Utility.newScheduledExecutor(1), 0, 0); AsyncGroup iGroup = AsyncGroup.create("", Utility.newScheduledExecutor(1), 0, 0);
SncpClient client = new SncpClient( SncpClient client = new SncpClient(
@@ -68,12 +69,24 @@ public class CacheInstanceTest {
null, "", serviceClass, boost, resourceFactory, grous, client, null, null, null); null, "", serviceClass, boost, resourceFactory, grous, client, null, null, null);
resourceFactory.inject(instance); resourceFactory.inject(instance);
CacheInstance instance2 = Sncp.createLocalService( CacheInstance instance2 = Sncp.createLocalService(
null, "", serviceClass, boost, resourceFactory2, grous, client, null, null, null); null, "", serviceClass, boost2, resourceFactory2, grous, client, null, null, null);
resourceFactory2.inject(instance2); resourceFactory2.inject(instance2);
System.out.println(instance.getName2()); System.out.println(instance.getName2());
System.out.println(instance.getClass());
Assertions.assertEquals("haha", instance.getName2()); Assertions.assertEquals("haha", instance.getName2());
Assertions.assertEquals("haha", instance2.getName2()); Assertions.assertEquals("haha", instance2.getName2());
instance.updateName(); System.out.println("准备设置 updateName");
System.out.println("instance1.manager = " + instance.getCacheManager());
System.out.println("instance2.manager = " + instance2.getCacheManager());
manager.updateBroadcastable(false);
instance.updateName("gege");
Assertions.assertEquals("gege", instance.getName2());
Assertions.assertEquals("haha", instance2.getName2());
manager.updateBroadcastable(true);
System.out.println("准备设置 updateName");
instance.updateName("gege");
System.out.println("设置结束 updateName");
Utility.sleep(10);
Assertions.assertEquals("gege", instance.getName2()); Assertions.assertEquals("gege", instance.getName2());
Assertions.assertEquals("gege", instance2.getName2()); Assertions.assertEquals("gege", instance2.getName2());
} }