From 71a7e7e7f6c0ee67664d84c4e46683a25288b94b Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 11 Dec 2023 15:18:51 +0800 Subject: [PATCH] CacheManager --- .../org/redkale/caching/CacheManager.java | 324 +++---------- .../redkale/caching/CacheManagerService.java | 457 ++++++++++++++++++ .../org/redkale/source/CacheMemorySource.java | 113 +++-- .../org/redkale/test/caching/CachingTest.java | 8 +- 4 files changed, 590 insertions(+), 312 deletions(-) create mode 100644 src/main/java/org/redkale/caching/CacheManagerService.java diff --git a/src/main/java/org/redkale/caching/CacheManager.java b/src/main/java/org/redkale/caching/CacheManager.java index 42af3612d..c64f83bc3 100644 --- a/src/main/java/org/redkale/caching/CacheManager.java +++ b/src/main/java/org/redkale/caching/CacheManager.java @@ -5,14 +5,7 @@ package org.redkale.caching; import java.lang.reflect.Type; import java.time.Duration; -import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import org.redkale.annotation.Nonnull; -import org.redkale.annotation.Nullable; -import org.redkale.source.CacheMemorySource; -import org.redkale.source.CacheSource; -import org.redkale.util.TypeToken; /** * //TODO 待实现 @@ -24,28 +17,7 @@ import org.redkale.util.TypeToken; * * @since 2.8.0 */ -public class CacheManager { - - //缓存配置项 - protected final CacheConfig config; - - //数据类型与CacheValue泛型的对应关系 - private final ConcurrentHashMap cacheValueTypes = new ConcurrentHashMap<>(); - - //本地缓存Source - protected final CacheSource localSource = new CacheMemorySource("caching"); - - //远程缓存Source - protected CacheSource remoteSource; - - protected CacheManager(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) { - this.config = Objects.requireNonNull(config); - this.remoteSource = remoteSource; - } - - public static CacheManager create(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) { - return new CacheManager(config, remoteSource); - } +public interface CacheManager { //-------------------------------------- 本地缓存 -------------------------------------- /** @@ -58,9 +30,7 @@ public class CacheManager { * * @return 数据值 */ - public T localGet(final String map, final String key, final Type type) { - return get(localSource, map, key, type); - } + public T localGet(final String map, final String key, final Type type); /** * 本地获取字符串缓存数据, 过期返回null @@ -70,8 +40,8 @@ public class CacheManager { * * @return 数据值 */ - public final String localGetString(final String map, final String key) { - return get(localSource, map, key, String.class); + default String localGetString(final String map, final String key) { + return localGet(map, key, String.class); } /** @@ -84,9 +54,7 @@ public class CacheManager { * @param value 数据值 * @param expire 过期时长,为null表示永不过期 */ - public void localSet(final String map, final String key, final Type type, final T value, Duration expire) { - set(localSource, map, key, type, value, expire); - } + public void localSet(String map, String key, Type type, T value, Duration expire); /** * 本地缓存字符串数据 @@ -96,8 +64,8 @@ public class CacheManager { * @param value 数据值 * @param expire 过期时长,为null表示永不过期 */ - public void localSetString(final String map, final String key, final String value, Duration expire) { - set(localSource, map, key, String.class, value, expire); + default void localSetString(final String map, final String key, final String value, Duration expire) { + localSet(map, key, String.class, value, expire); } /** @@ -108,9 +76,7 @@ public class CacheManager { * * @return 删除数量 */ - public long localDel(String map, String key) { - return del(localSource, map, key); - } + public long localDel(String map, String key); //-------------------------------------- 远程缓存 -------------------------------------- /** @@ -123,8 +89,18 @@ public class CacheManager { * * @return 数据值 */ - public T remoteGet(final String map, final String key, final Type type) { - return get(remoteSource, map, key, type); + public T remoteGet(final String map, final String key, final Type type); + + /** + * 远程获取字符串缓存数据, 过期返回null + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 数据值 + */ + default String remoteGetString(final String map, final String key) { + return remoteGet(map, key, String.class); } /** @@ -137,21 +113,7 @@ public class CacheManager { * * @return 数据值 */ - public CompletableFuture remoteGetAsync(final String map, final String key, final Type type) { - return getAsync(remoteSource, map, key, type); - } - - /** - * 远程获取字符串缓存数据, 过期返回null - * - * @param map 缓存hash - * @param key 缓存键 - * - * @return 数据值 - */ - public final String remoteGetString(final String map, final String key) { - return get(remoteSource, map, key, String.class); - } + public CompletableFuture remoteGetAsync(final String map, final String key, final Type type); /** * 远程异步获取字符串缓存数据, 过期返回null @@ -161,8 +123,8 @@ public class CacheManager { * * @return 数据值 */ - public final CompletableFuture remoteGetStringAsync(final String map, final String key) { - return getAsync(remoteSource, map, key, String.class); + default CompletableFuture remoteGetStringAsync(final String map, final String key) { + return remoteGetAsync(map, key, String.class); } /** @@ -175,8 +137,18 @@ public class CacheManager { * @param value 数据值 * @param expire 过期时长,为null表示永不过期 */ - public void remoteSet(final String map, final String key, final Type type, final T value, Duration expire) { - set(remoteSource, map, key, type, value, expire); + public void remoteSet(final String map, final String key, final Type type, final T value, Duration expire); + + /** + * 远程缓存字符串数据 + * + * @param map 缓存hash + * @param key 缓存键 + * @param value 数据值 + * @param expire 过期时长,为null表示永不过期 + */ + default void remoteSetString(final String map, final String key, final String value, Duration expire) { + remoteSet(map, key, String.class, value, expire); } /** @@ -189,21 +161,7 @@ public class CacheManager { * @param value 数据值 * @param expire 过期时长,为null表示永不过期 */ - public CompletableFuture remoteSetAsync(final String map, final String key, final Type type, final T value, Duration expire) { - return setAsync(remoteSource, map, key, type, value, expire); - } - - /** - * 远程缓存字符串数据 - * - * @param map 缓存hash - * @param key 缓存键 - * @param value 数据值 - * @param expire 过期时长,为null表示永不过期 - */ - public void remoteSetString(final String map, final String key, final String value, Duration expire) { - set(remoteSource, map, key, String.class, value, expire); - } + public CompletableFuture remoteSetAsync(String map, String key, Type type, T value, Duration expire); /** * 远程异步缓存字符串数据 @@ -213,8 +171,8 @@ public class CacheManager { * @param value 数据值 * @param expire 过期时长,为null表示永不过期 */ - public CompletableFuture remoteSetStringAsync(final String map, final String key, final String value, Duration expire) { - return setAsync(remoteSource, map, key, String.class, value, expire); + default CompletableFuture remoteSetStringAsync(final String map, final String key, final String value, Duration expire) { + return remoteSetAsync(map, key, String.class, value, expire); } /** @@ -225,9 +183,7 @@ public class CacheManager { * * @return 删除数量 */ - public long remoteDel(String map, String key) { - return del(remoteSource, map, key); - } + public long remoteDel(String map, String key); /** * 远程异步删除缓存数据 @@ -237,9 +193,7 @@ public class CacheManager { * * @return 删除数量 */ - public CompletableFuture remoteDelAsync(String map, String key) { - return delAsync(remoteSource, map, key); - } + public CompletableFuture remoteDelAsync(String map, String key); //-------------------------------------- both缓存 -------------------------------------- /** @@ -252,9 +206,18 @@ public class CacheManager { * * @return 数据值 */ - public T bothGet(final String map, final String key, final Type type) { - T val = get(localSource, map, key, type); - return val == null ? get(remoteSource, map, key, type) : val; + public T bothGet(final String map, final String key, final Type type); + + /** + * 远程获取字符串缓存数据, 过期返回null + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 数据值 + */ + default String bothGetString(final String map, final String key) { + return bothGet(map, key, String.class); } /** @@ -267,25 +230,7 @@ public class CacheManager { * * @return 数据值 */ - public CompletableFuture bothGetAsync(final String map, final String key, final Type type) { - T val = get(localSource, map, key, type); - if (val != null) { - return CompletableFuture.completedFuture(val); - } - return getAsync(remoteSource, map, key, type); - } - - /** - * 远程获取字符串缓存数据, 过期返回null - * - * @param map 缓存hash - * @param key 缓存键 - * - * @return 数据值 - */ - public final String bothGetString(final String map, final String key) { - return bothGet(map, key, String.class); - } + public CompletableFuture bothGetAsync(final String map, final String key, final Type type); /** * 远程异步获取字符串缓存数据, 过期返回null @@ -295,7 +240,7 @@ public class CacheManager { * * @return 数据值 */ - public final CompletableFuture bothGetStringAsync(final String map, final String key) { + default CompletableFuture bothGetStringAsync(final String map, final String key) { return bothGetAsync(map, key, String.class); } @@ -310,9 +255,19 @@ public class CacheManager { * @param localExpire 本地过期时长,为null表示永不过期 * @param remoteExpire 远程过期时长,为null表示永不过期 */ - public void bothSet(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) { - set(localSource, map, key, type, value, localExpire); - set(remoteSource, map, key, type, value, remoteExpire); + public void bothSet(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire); + + /** + * 远程缓存字符串数据 + * + * @param map 缓存hash + * @param key 缓存键 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + default void bothSetString(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) { + bothSet(map, key, String.class, value, localExpire, remoteExpire); } /** @@ -326,23 +281,7 @@ public class CacheManager { * @param localExpire 本地过期时长,为null表示永不过期 * @param remoteExpire 远程过期时长,为null表示永不过期 */ - public CompletableFuture bothSetAsync(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) { - set(localSource, map, key, type, value, localExpire); - return setAsync(remoteSource, map, key, type, value, remoteExpire); - } - - /** - * 远程缓存字符串数据 - * - * @param map 缓存hash - * @param key 缓存键 - * @param value 数据值 - * @param localExpire 本地过期时长,为null表示永不过期 - * @param remoteExpire 远程过期时长,为null表示永不过期 - */ - public void bothSetString(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) { - bothSet(map, key, String.class, value, localExpire, remoteExpire); - } + public CompletableFuture bothSetAsync(String map, String key, Type type, T value, Duration localExpire, Duration remoteExpire); /** * 远程异步缓存字符串数据 @@ -353,7 +292,7 @@ public class CacheManager { * @param localExpire 本地过期时长,为null表示永不过期 * @param remoteExpire 远程过期时长,为null表示永不过期 */ - public CompletableFuture bothSetStringAsync(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) { + default CompletableFuture bothSetStringAsync(String map, String key, String value, Duration localExpire, Duration remoteExpire) { return bothSetAsync(map, key, String.class, value, localExpire, remoteExpire); } @@ -365,10 +304,7 @@ public class CacheManager { * * @return 删除数量 */ - public long bothDel(String map, String key) { - del(localSource, map, key); - return del(remoteSource, map, key); - } + public long bothDel(String map, String key); /** * 远程异步删除缓存数据 @@ -378,124 +314,6 @@ public class CacheManager { * * @return 删除数量 */ - public CompletableFuture bothDelAsync(String map, String key) { - del(localSource, map, key); - return delAsync(remoteSource, map, key); - } + public CompletableFuture bothDelAsync(String map, String key); - //-------------------------------------- 内部方法 -------------------------------------- - /** - * 获取缓存数据, 过期返回null - * - * @param 泛型 - * @param source 缓存源 - * @param map 缓存hash - * @param key 缓存键 - * @param type 数据类型 - * - * @return 数据值 - */ - protected T get(final CacheSource source, final String map, final String key, final Type type) { - CacheValue val = source.hget(map, key, loadCacheType(type)); - return val != null && !val.isExpired() ? val.getValue() : null; - } - - /** - * 获取缓存数据, 过期返回null - * - * @param 泛型 - * @param source 缓存源 - * @param map 缓存hash - * @param key 缓存键 - * @param type 数据类型 - * - * @return 数据值 - */ - protected CompletableFuture getAsync(final CacheSource source, final String map, final String key, final Type type) { - return source.hgetAsync(map, key, loadCacheType(type)).thenApply(v -> { - CacheValue val = (CacheValue) v; - return val != null && !val.isExpired() ? (T) val.getValue() : null; - }); - } - - /** - * 缓存数据 - * - * @param 泛型 - * @param source 缓存源 - * @param map 缓存hash - * @param key 缓存键 - * @param type 数据类型 - * @param value 数据值 - * @param expire 过期时长,为null表示永不过期 - */ - protected void set(final CacheSource source, final String map, final String key, final Type type, final T value, Duration expire) { - Type t = loadCacheType(type, value); - source.hset(map, key, t, CacheValue.create(value, expire)); - } - - /** - * 缓存数据 - * - * @param 泛型 - * @param source 缓存源 - * @param map 缓存hash - * @param key 缓存键 - * @param type 数据类型 - * @param value 数据值 - * @param expire 过期时长,为null表示永不过期 - */ - protected CompletableFuture setAsync(final CacheSource source, final String map, final String key, final Type type, final T value, Duration expire) { - Type t = loadCacheType(type, value); - return source.hsetAsync(map, key, t, CacheValue.create(value, expire)); - } - - /** - * 删除缓存数据 - * - * @param source 缓存源 - * @param map 缓存hash - * @param key 缓存键 - * - * @return 删除数量 - */ - protected long del(final CacheSource source, String map, String key) { - return source.hdel(map, key); - } - - /** - * 删除缓存数据 - * - * @param source 缓存源 - * @param map 缓存hash - * @param key 缓存键 - * - * @return 删除数量 - */ - protected CompletableFuture delAsync(final CacheSource source, String map, String key) { - return source.hdelAsync(map, key); - } - - /** - * 创建数据类型创建对应CacheValue泛型 - * - * @param type 数据类型,为null则取value的类型 - * @param value 数据值 - * - * @return CacheValue泛型 - */ - protected Type loadCacheType(Type type, final Object value) { - return loadCacheType(type == null ? value.getClass() : type); - } - - /** - * 创建数据类型创建对应CacheValue泛型 - * - * @param type 数据类型 - * - * @return CacheValue泛型 - */ - protected Type loadCacheType(Type type) { - return cacheValueTypes.computeIfAbsent(type, t -> TypeToken.createParameterizedType(null, CacheValue.class, type)); - } } diff --git a/src/main/java/org/redkale/caching/CacheManagerService.java b/src/main/java/org/redkale/caching/CacheManagerService.java new file mode 100644 index 000000000..873db5e21 --- /dev/null +++ b/src/main/java/org/redkale/caching/CacheManagerService.java @@ -0,0 +1,457 @@ +/* + * + */ +package org.redkale.caching; + +import java.lang.reflect.Type; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import org.redkale.annotation.AutoLoad; +import org.redkale.annotation.Component; +import org.redkale.annotation.Nonnull; +import org.redkale.annotation.Nullable; +import org.redkale.annotation.ResourceType; +import org.redkale.service.Local; +import org.redkale.service.Service; +import org.redkale.source.CacheMemorySource; +import org.redkale.source.CacheSource; +import org.redkale.util.AnyValue; +import org.redkale.util.TypeToken; + +/** + * + * @author zhangjx + */ +@Local +@Component +@AutoLoad(false) +@ResourceType(CacheManager.class) +public class CacheManagerService implements CacheManager, Service { + + //缓存配置项 + protected final CacheConfig config; + + //数据类型与CacheValue泛型的对应关系 + private final ConcurrentHashMap cacheValueTypes = new ConcurrentHashMap<>(); + + //本地缓存Source + protected final CacheMemorySource localSource = new CacheMemorySource("caching"); + + //远程缓存Source + protected CacheSource remoteSource; + + protected CacheManagerService(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) { + this.config = Objects.requireNonNull(config); + this.remoteSource = remoteSource; + } + + public static CacheManagerService create(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) { + return new CacheManagerService(config, remoteSource); + } + + @Override + public void init(AnyValue conf) { + this.localSource.init(conf); + } + + @Override + public void destroy(AnyValue conf) { + this.localSource.destroy(conf); + } + + //-------------------------------------- 本地缓存 -------------------------------------- + /** + * 本地获取缓存数据, 过期返回null + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + @Override + public T localGet(final String map, final String key, final Type type) { + return get(localSource, map, key, type); + } + + /** + * 本地缓存数据 + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param expire 过期时长,为null表示永不过期 + */ + @Override + public void localSet(String map, String key, Type type, T value, Duration expire) { + set(localSource, map, key, type, value, expire); + } + + /** + * 本地删除缓存数据 + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + @Override + public long localDel(String map, String key) { + return del(localSource, map, key); + } + + //-------------------------------------- 远程缓存 -------------------------------------- + /** + * 远程获取缓存数据, 过期返回null + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + @Override + public T remoteGet(final String map, final String key, final Type type) { + return get(remoteSource, map, key, type); + } + + /** + * 远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + @Override + public CompletableFuture remoteGetAsync(final String map, final String key, final Type type) { + return getAsync(remoteSource, map, key, type); + } + + /** + * 远程缓存数据 + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param expire 过期时长,为null表示永不过期 + */ + public void remoteSet(final String map, final String key, final Type type, final T value, Duration expire) { + set(remoteSource, map, key, type, value, expire); + } + + /** + * 远程异步缓存数据 + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param expire 过期时长,为null表示永不过期 + */ + public CompletableFuture remoteSetAsync(String map, String key, Type type, T value, Duration expire) { + return setAsync(remoteSource, map, key, type, value, expire); + } + + /** + * 远程删除缓存数据 + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + public long remoteDel(String map, String key) { + return del(remoteSource, map, key); + } + + /** + * 远程异步删除缓存数据 + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + public CompletableFuture remoteDelAsync(String map, String key) { + return delAsync(remoteSource, map, key); + } + + //-------------------------------------- both缓存 -------------------------------------- + /** + * 远程获取缓存数据, 过期返回null + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + public T bothGet(final String map, final String key, final Type type) { + T val = get(localSource, map, key, type); + return val == null ? get(remoteSource, map, key, type) : val; + } + + /** + * 远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + public CompletableFuture bothGetAsync(final String map, final String key, final Type type) { + T val = get(localSource, map, key, type); + if (val != null) { + return CompletableFuture.completedFuture(val); + } + return getAsync(remoteSource, map, key, type); + } + + /** + * 远程获取字符串缓存数据, 过期返回null + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 数据值 + */ + public final String bothGetString(final String map, final String key) { + return bothGet(map, key, String.class); + } + + /** + * 远程异步获取字符串缓存数据, 过期返回null + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 数据值 + */ + public final CompletableFuture bothGetStringAsync(final String map, final String key) { + return bothGetAsync(map, key, String.class); + } + + /** + * 远程缓存数据 + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + public void bothSet(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) { + set(localSource, map, key, type, value, localExpire); + set(remoteSource, map, key, type, value, remoteExpire); + } + + /** + * 远程异步缓存数据 + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + public CompletableFuture bothSetAsync(String map, String key, Type type, T value, Duration localExpire, Duration remoteExpire) { + set(localSource, map, key, type, value, localExpire); + return setAsync(remoteSource, map, key, type, value, remoteExpire); + } + + /** + * 远程缓存字符串数据 + * + * @param map 缓存hash + * @param key 缓存键 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + public void bothSetString(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) { + bothSet(map, key, String.class, value, localExpire, remoteExpire); + } + + /** + * 远程异步缓存字符串数据 + * + * @param map 缓存hash + * @param key 缓存键 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + public CompletableFuture bothSetStringAsync(String map, String key, String value, Duration localExpire, Duration remoteExpire) { + return bothSetAsync(map, key, String.class, value, localExpire, remoteExpire); + } + + /** + * 远程删除缓存数据 + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + public long bothDel(String map, String key) { + del(localSource, map, key); + return del(remoteSource, map, key); + } + + /** + * 远程异步删除缓存数据 + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + public CompletableFuture bothDelAsync(String map, String key) { + del(localSource, map, key); + return delAsync(remoteSource, map, key); + } + + //-------------------------------------- 内部方法 -------------------------------------- + /** + * 获取缓存数据, 过期返回null + * + * @param 泛型 + * @param source 缓存源 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + protected T get(final CacheSource source, final String map, final String key, final Type type) { + CacheValue val = source.hget(map, key, loadCacheType(type)); + return val != null && !val.isExpired() ? val.getValue() : null; + } + + /** + * 获取缓存数据, 过期返回null + * + * @param 泛型 + * @param source 缓存源 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + protected CompletableFuture getAsync(final CacheSource source, final String map, final String key, final Type type) { + if (source == null) { + return CompletableFuture.failedFuture(new NullPointerException(CacheManager.class.getSimpleName() + ".source is null")); + } + return source.hgetAsync(map, key, loadCacheType(type)).thenApply(v -> { + CacheValue val = (CacheValue) v; + return val != null && !val.isExpired() ? (T) val.getValue() : null; + }); + } + + /** + * 缓存数据 + * + * @param 泛型 + * @param source 缓存源 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param expire 过期时长,为null表示永不过期 + */ + protected void set(CacheSource source, String map, String key, Type type, T value, Duration expire) { + Type t = loadCacheType(type, value); + source.hset(map, key, t, CacheValue.create(value, expire)); + } + + /** + * 缓存数据 + * + * @param 泛型 + * @param source 缓存源 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param expire 过期时长,为null表示永不过期 + */ + protected CompletableFuture setAsync(CacheSource source, String map, String key, Type type, T value, Duration expire) { + if (source == null) { + return CompletableFuture.failedFuture(new NullPointerException(CacheManager.class.getSimpleName() + ".source is null")); + } + Type t = loadCacheType(type, value); + return source.hsetAsync(map, key, t, CacheValue.create(value, expire)); + } + + /** + * 删除缓存数据 + * + * @param source 缓存源 + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + protected long del(final CacheSource source, String map, String key) { + return source.hdel(map, key); + } + + /** + * 删除缓存数据 + * + * @param source 缓存源 + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + protected CompletableFuture delAsync(final CacheSource source, String map, String key) { + if (source == null) { + return CompletableFuture.failedFuture(new NullPointerException(CacheManager.class.getSimpleName() + ".source is null")); + } + return source.hdelAsync(map, key); + } + + /** + * 创建数据类型创建对应CacheValue泛型 + * + * @param type 数据类型,为null则取value的类型 + * @param value 数据值 + * + * @return CacheValue泛型 + */ + protected Type loadCacheType(Type type, final Object value) { + return loadCacheType(type == null ? value.getClass() : type); + } + + /** + * 创建数据类型创建对应CacheValue泛型 + * + * @param type 数据类型 + * + * @return CacheValue泛型 + */ + protected Type loadCacheType(Type type) { + return cacheValueTypes.computeIfAbsent(type, t -> TypeToken.createParameterizedType(null, CacheValue.class, type)); + } +} diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index f0dce1ba8..f28ca7f5a 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -344,7 +344,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { entry = new CacheEntry(CacheEntryType.OBJECT, key); container.put(key, entry); - entry.setObjectValue(convert, type, value); + entry.setObjectValue(convert == null ? this.convert : convert, type, value); entry.expireSeconds(expireSeconds); entry.lastAccessed = System.currentTimeMillis(); return true; @@ -365,7 +365,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public T getSet(String key, Convert convert, Type type, T value) { CacheEntry entry = find(key, CacheEntryType.OBJECT); - T old = entry == null ? null : (T) entry.getObjectValue(convert, type); + T old = entry == null ? null : (T) entry.getObjectValue(convert == null ? this.convert : convert, type); set0(key, 0, convert, type, value); return old; } @@ -387,7 +387,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } finally { containerLock.unlock(); } - return entry.getObjectValue(null, type); + return entry.getObjectValue(convert, type); } @Override @@ -411,7 +411,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - entry.setObjectValue(convert, type, value); + entry.setObjectValue(convert == null ? this.convert : convert, type, value); entry.expireSeconds(expireSeconds); entry.lastAccessed = System.currentTimeMillis(); } finally { @@ -577,7 +577,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.lock(); try { if (entry.cacheType != CacheEntryType.ATOMIC) { - entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(null, String.class))); + entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(convert, String.class))); entry.cacheType = CacheEntryType.ATOMIC; } return ((AtomicLong) entry.objectValue).addAndGet(num); @@ -610,7 +610,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.lock(); try { if (entry.cacheType != CacheEntryType.DOUBLE) { - entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(null, String.class))); + entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(convert, String.class))); entry.cacheType = CacheEntryType.DOUBLE; } Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num)); @@ -698,24 +698,25 @@ public final class CacheMemorySource extends AbstractCacheSource { if (expireSeconds > 0) { entry.expireSeconds(expireSeconds); } + final Convert c = convert == null ? this.convert : convert; // OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP; switch (entry.cacheType) { case ATOMIC: - return CacheEntry.serialToObj(convert, type, (AtomicLong) entry.objectValue); + return CacheEntry.serialToObj(c, type, (AtomicLong) entry.objectValue); case DOUBLE: - return CacheEntry.serialToObj(convert, type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue())); + return CacheEntry.serialToObj(c, type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue())); case SSET: - return (T) entry.ssetValue.stream().map(v -> CacheEntry.serialToObj(convert, type, v)).collect(Collectors.toSet()); + return (T) entry.ssetValue.stream().map(v -> CacheEntry.serialToObj(c, type, v)).collect(Collectors.toSet()); case ZSET: return (T) entry.zsetValue.stream().map(v -> new CacheScoredValue(v)).collect(Collectors.toSet()); case LIST: - return (T) entry.listValue.stream().map(v -> CacheEntry.serialToObj(convert, type, v)).collect(Collectors.toList()); + return (T) entry.listValue.stream().map(v -> CacheEntry.serialToObj(c, type, v)).collect(Collectors.toList()); case MAP: LinkedHashMap map = new LinkedHashMap(); - entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(convert, type, v))); + entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(c, type, v))); return (T) map; default: - return entry.getObjectValue(convert, type); + return entry.getObjectValue(c, type); } } @@ -803,7 +804,7 @@ public final class CacheMemorySource extends AbstractCacheSource { Map map = entry.mapValue; Serializable val = (Serializable) map.computeIfAbsent(field, f -> new AtomicLong()); if (!(val instanceof AtomicLong)) { - val = CacheEntry.objToSerial(null, AtomicLong.class, val); + val = CacheEntry.objToSerial(convert, AtomicLong.class, val); map.put(field, val); } return ((AtomicLong) val).addAndGet(num); @@ -889,7 +890,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - boolean rs = entry.setMapValueIfAbsent(field, convert, type, value) == null; + boolean rs = entry.setMapValueIfAbsent(field, convert == null ? this.convert : convert, type, value) == null; entry.lastAccessed = System.currentTimeMillis(); return rs; } finally { @@ -932,7 +933,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } List rs = new ArrayList<>(fields.length); for (String field : fields) { - rs.add(entry.getMapValue(field, null, type)); + rs.add(entry.getMapValue(field, convert, type)); } return rs; } @@ -949,7 +950,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return new LinkedHashMap(); } else { Map map = new LinkedHashMap(); - entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(null, type, v))); + entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(convert, type, v))); return map; } } @@ -965,7 +966,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return new ArrayList(); } else { - Stream stream = entry.mapValue.values().stream().map(v -> CacheEntry.serialToObj(null, type, v)); + Stream stream = entry.mapValue.values().stream().map(v -> CacheEntry.serialToObj(convert, type, v)); return new ArrayList(stream.collect(Collectors.toList())); } } @@ -987,12 +988,12 @@ public final class CacheMemorySource extends AbstractCacheSource { if (Utility.isEmpty(pattern)) { Set> set = entry.mapValue.entrySet(); return set.stream() - .collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(null, type, en.getValue()))); + .collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(convert, type, en.getValue()))); } else { Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); Set> set = entry.mapValue.entrySet(); return set.stream().filter(en -> regx.test(en.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(null, type, en.getValue()))); + .collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(convert, type, en.getValue()))); } } @@ -1028,7 +1029,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return 0L; } - String obj = entry.getMapValue(field, null, String.class); + String obj = entry.getMapValue(field, convert, String.class); return obj == null ? 0L : (long) obj.length(); } @@ -1056,7 +1057,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - entry.setMapValue(field, convert, type, value); + entry.setMapValue(field, convert == null ? this.convert : convert, type, value); entry.lastAccessed = System.currentTimeMillis(); } finally { entry.unlock(); @@ -1110,7 +1111,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } List list = new ArrayList(entry.listValue); int pos = index >= 0 ? index : list.size() + index; - return pos >= list.size() ? null : CacheEntry.serialToObj(null, componentType, list.get(pos)); + return pos >= list.size() ? null : CacheEntry.serialToObj(convert, componentType, list.get(pos)); } @Override @@ -1144,7 +1145,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return 0L; } entry.lock(); - Serializable val = CacheEntry.objToSerial(null, componentType, value); + Serializable val = CacheEntry.objToSerial(convert, componentType, value); try { List list = new ArrayList<>(entry.listValue); int pos = list.indexOf(pivot); @@ -1197,7 +1198,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.lock(); try { for (T val : values) { - entry.listValue.addFirst(CacheEntry.objToSerial(null, componentType, val)); + entry.listValue.addFirst(CacheEntry.objToSerial(convert, componentType, val)); } } finally { entry.unlock(); @@ -1219,7 +1220,7 @@ public final class CacheMemorySource extends AbstractCacheSource { try { ConcurrentLinkedDeque list = entry.listValue; for (T val : values) { - list.addFirst(CacheEntry.objToSerial(null, componentType, val)); + list.addFirst(CacheEntry.objToSerial(convert, componentType, val)); } } finally { entry.unlock(); @@ -1239,7 +1240,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - return CacheEntry.serialToObj(null, componentType, entry.listValue.pollFirst()); + return CacheEntry.serialToObj(convert, componentType, entry.listValue.pollFirst()); } finally { entry.unlock(); } @@ -1300,7 +1301,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - return CacheEntry.serialToObj(null, componentType, entry.listValue.pollLast()); + return CacheEntry.serialToObj(convert, componentType, entry.listValue.pollLast()); } finally { entry.unlock(); } @@ -1321,7 +1322,7 @@ public final class CacheMemorySource extends AbstractCacheSource { try { ConcurrentLinkedDeque list = entry.listValue; for (T val : values) { - list.add(CacheEntry.objToSerial(null, componentType, val)); + list.add(CacheEntry.objToSerial(convert, componentType, val)); } } finally { entry.unlock(); @@ -1352,7 +1353,7 @@ public final class CacheMemorySource extends AbstractCacheSource { try { ConcurrentLinkedDeque list = entry.listValue; for (T val : values) { - list.add(CacheEntry.objToSerial(null, componentType, val)); + list.add(CacheEntry.objToSerial(convert, componentType, val)); } } finally { entry.unlock(); @@ -1396,15 +1397,15 @@ public final class CacheMemorySource extends AbstractCacheSource { for (int i = 0; i < Math.abs(count); i++) { int index = ThreadLocalRandom.current().nextInt(vals.size()); Serializable val = vals.get(index); - list.add(CacheEntry.serialToObj(null, componentType, val)); + list.add(CacheEntry.serialToObj(convert, componentType, val)); } } else { //不可以重复 if (count >= vals.size()) { return vals.stream() - .map(val -> (T) CacheEntry.serialToObj(null, componentType, val)).collect(Collectors.toList()); + .map(val -> (T) CacheEntry.serialToObj(convert, componentType, val)).collect(Collectors.toList()); } return vals.subList(0, count).stream() - .map(val -> (T) CacheEntry.serialToObj(null, componentType, val)).collect(Collectors.toList()); + .map(val -> (T) CacheEntry.serialToObj(convert, componentType, val)).collect(Collectors.toList()); } return list; } @@ -1423,7 +1424,7 @@ public final class CacheMemorySource extends AbstractCacheSource { boolean rs = false; entry.lock(); try { - Serializable val = CacheEntry.objToSerial(null, componentType, member); + Serializable val = CacheEntry.objToSerial(convert, componentType, member); rs = entry.ssetValue.remove(val); } finally { entry.unlock(); @@ -1444,7 +1445,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry2.lock(); try { - entry2.addSsetValue(null, componentType, member); + entry2.addSsetValue(convert, componentType, member); } finally { entry2.unlock(); } @@ -1460,7 +1461,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public Set sdiff(final String key, final Type componentType, final String... key2s) { return sdiff0(key, key2s).stream() - .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .map(v -> (T) CacheEntry.serialToObj(convert, componentType, v)) .collect(Collectors.toSet()); } @@ -1519,7 +1520,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public Set sinter(final String key, final Type componentType, final String... key2s) { return sinter0(key, key2s).stream() - .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .map(v -> (T) CacheEntry.serialToObj(convert, componentType, v)) .collect(Collectors.toSet()); } @@ -1587,7 +1588,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public Set sunion(final String key, final Type componentType, final String... key2s) { return sunion0(key, key2s).stream() - .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .map(v -> (T) CacheEntry.serialToObj(convert, componentType, v)) .collect(Collectors.toSet()); } @@ -1650,7 +1651,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return new LinkedHashSet<>(); } return entry.ssetValue.stream() - .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .map(v -> (T) CacheEntry.serialToObj(convert, componentType, v)) .collect(Collectors.toSet()); } @@ -1666,7 +1667,7 @@ public final class CacheMemorySource extends AbstractCacheSource { CacheEntry entry = find(key, CacheEntryType.SSET); if (entry != null) { map.put(key, entry.ssetValue.stream() - .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .map(v -> (T) CacheEntry.serialToObj(convert, componentType, v)) .collect(Collectors.toSet())); } } @@ -1718,7 +1719,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.lock(); try { for (T val : values) { - entry.addSsetValue(null, componentType, val); + entry.addSsetValue(convert, componentType, val); } } finally { entry.unlock(); @@ -1744,7 +1745,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public boolean sismember(final String key, final Type type, final T value) { CacheEntry entry = find(key, CacheEntryType.SSET); - return entry != null && entry.ssetValue.contains(CacheEntry.objToSerial(null, type, value)); + return entry != null && entry.ssetValue.contains(CacheEntry.objToSerial(convert, type, value)); } @Override @@ -1771,7 +1772,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } if (del != null) { cset.remove(del); - return CacheEntry.serialToObj(null, componentType, del); + return CacheEntry.serialToObj(convert, componentType, del); } return null; } finally { @@ -1803,7 +1804,7 @@ public final class CacheMemorySource extends AbstractCacheSource { while (it.hasNext()) { Serializable item = it.next(); rms.add(item); - list.add(CacheEntry.serialToObj(null, componentType, item)); + list.add(CacheEntry.serialToObj(convert, componentType, item)); if (++index >= count) { break; } @@ -1835,7 +1836,7 @@ public final class CacheMemorySource extends AbstractCacheSource { Iterator it = cset.iterator(); Set list = new LinkedHashSet<>(); while (it.hasNext()) { - list.add(CacheEntry.serialToObj(null, componentType, it.next())); + list.add(CacheEntry.serialToObj(convert, componentType, it.next())); } return list; } finally { @@ -1856,7 +1857,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } long count = 0; for (T val : values) { - count += entry.ssetValue.remove(CacheEntry.objToSerial(null, type, val)) ? 1 : 0; + count += entry.ssetValue.remove(CacheEntry.objToSerial(convert, type, val)) ? 1 : 0; } return count; } @@ -2248,7 +2249,7 @@ public final class CacheMemorySource extends AbstractCacheSource { OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP; } - //值类型只能是: String、byte[]、AtomicLong + //Serializable的具体数据类型只能是: String、byte[]、AtomicLong public static final class CacheEntry { volatile long lastAccessed; //最后刷新时间 @@ -2303,36 +2304,34 @@ public final class CacheMemorySource extends AbstractCacheSource { } //value类型只能是byte[]/String/AtomicLong - public static T serialToObj(Convert convert, @Nonnull Type type, Serializable value) { + public static T serialToObj(@Nonnull Convert convert, @Nonnull Type type, Serializable value) { if (value == null) { return null; } - Convert c = convert == null ? JsonConvert.root() : convert; if (value.getClass() == byte[].class) { - return (T) c.convertFrom(type, (byte[]) value); + return (T) convert.convertFrom(type, (byte[]) value); } else { //String/AtomicLong - if (c instanceof TextConvert) { - return (T) ((TextConvert) c).convertFrom(type, value.toString()); + if (convert instanceof TextConvert) { + return (T) ((TextConvert) convert).convertFrom(type, value.toString()); } else { - return (T) c.convertFrom(type, value.toString().getBytes(StandardCharsets.UTF_8)); + return (T) convert.convertFrom(type, value.toString().getBytes(StandardCharsets.UTF_8)); } } } //返回类型只能是byte[]/String/AtomicLong - public static Serializable objToSerial(Convert convert, Type type, Object value) { + public static Serializable objToSerial(@Nonnull Convert convert, Type type, Object value) { if (value == null) { return null; } if (value instanceof String || value instanceof byte[]) { return (Serializable) value; } - Convert c = convert == null ? JsonConvert.root() : convert; Type t = type == null ? value.getClass() : type; - if (c instanceof TextConvert) { - return ((TextConvert) c).convertTo(t, value); + if (convert instanceof TextConvert) { + return ((TextConvert) convert).convertTo(t, value); } else { - return c.convertToBytes(t, value); + return convert.convertToBytes(t, value); } } diff --git a/src/test/java/org/redkale/test/caching/CachingTest.java b/src/test/java/org/redkale/test/caching/CachingTest.java index d677e3071..e1ab0ea8a 100644 --- a/src/test/java/org/redkale/test/caching/CachingTest.java +++ b/src/test/java/org/redkale/test/caching/CachingTest.java @@ -7,7 +7,7 @@ import java.time.Duration; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redkale.caching.CacheConfig; -import org.redkale.caching.CacheManager; +import org.redkale.caching.CacheManagerService; import org.redkale.convert.json.JsonConvert; import org.redkale.source.CacheMemorySource; import org.redkale.util.Utility; @@ -25,7 +25,10 @@ public class CachingTest { @Test public void run() throws Exception { - CacheManager cache = CacheManager.create(new CacheConfig(), new CacheMemorySource("remote")); + CacheMemorySource remoteSource = new CacheMemorySource("remote"); + remoteSource.init(null); + CacheManagerService cache = CacheManagerService.create(new CacheConfig(), remoteSource); + cache.init(null); Duration expire = Duration.ofMillis(490); cache.localSetString("user", "name:haha", "myha", expire); Assertions.assertEquals(cache.localGetString("user", "name:haha"), "myha"); @@ -41,6 +44,7 @@ public class CachingTest { Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json); bean.setRemark(bean.getRemark() + "-新备注"); Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json); + cache.destroy(null); } public static class CachingBean {