CacheManager

This commit is contained in:
redkale
2023-12-11 15:18:51 +08:00
parent bd088ab9ec
commit 71a7e7e7f6
4 changed files with 590 additions and 312 deletions

View File

@@ -5,14 +5,7 @@ package org.redkale.caching;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.redkale.annotation.Nonnull;
import org.redkale.annotation.Nullable;
import org.redkale.source.CacheMemorySource;
import org.redkale.source.CacheSource;
import org.redkale.util.TypeToken;
/**
* //TODO 待实现
@@ -24,28 +17,7 @@ import org.redkale.util.TypeToken;
*
* @since 2.8.0
*/
public class CacheManager {
//缓存配置项
protected final CacheConfig config;
//数据类型与CacheValue泛型的对应关系
private final ConcurrentHashMap<Type, Type> cacheValueTypes = new ConcurrentHashMap<>();
//本地缓存Source
protected final CacheSource localSource = new CacheMemorySource("caching");
//远程缓存Source
protected CacheSource remoteSource;
protected CacheManager(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) {
this.config = Objects.requireNonNull(config);
this.remoteSource = remoteSource;
}
public static CacheManager create(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) {
return new CacheManager(config, remoteSource);
}
public interface CacheManager {
//-------------------------------------- 本地缓存 --------------------------------------
/**
@@ -58,9 +30,7 @@ public class CacheManager {
*
* @return 数据值
*/
public <T> T localGet(final String map, final String key, final Type type) {
return get(localSource, map, key, type);
}
public <T> T localGet(final String map, final String key, final Type type);
/**
* 本地获取字符串缓存数据, 过期返回null
@@ -70,8 +40,8 @@ public class CacheManager {
*
* @return 数据值
*/
public final String localGetString(final String map, final String key) {
return get(localSource, map, key, String.class);
default String localGetString(final String map, final String key) {
return localGet(map, key, String.class);
}
/**
@@ -84,9 +54,7 @@ public class CacheManager {
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
public <T> void localSet(final String map, final String key, final Type type, final T value, Duration expire) {
set(localSource, map, key, type, value, expire);
}
public <T> void localSet(String map, String key, Type type, T value, Duration expire);
/**
* 本地缓存字符串数据
@@ -96,8 +64,8 @@ public class CacheManager {
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
public void localSetString(final String map, final String key, final String value, Duration expire) {
set(localSource, map, key, String.class, value, expire);
default void localSetString(final String map, final String key, final String value, Duration expire) {
localSet(map, key, String.class, value, expire);
}
/**
@@ -108,9 +76,7 @@ public class CacheManager {
*
* @return 删除数量
*/
public long localDel(String map, String key) {
return del(localSource, map, key);
}
public long localDel(String map, String key);
//-------------------------------------- 远程缓存 --------------------------------------
/**
@@ -123,8 +89,18 @@ public class CacheManager {
*
* @return 数据值
*/
public <T> T remoteGet(final String map, final String key, final Type type) {
return get(remoteSource, map, key, type);
public <T> T remoteGet(final String map, final String key, final Type type);
/**
* 远程获取字符串缓存数据, 过期返回null
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 数据值
*/
default String remoteGetString(final String map, final String key) {
return remoteGet(map, key, String.class);
}
/**
@@ -137,21 +113,7 @@ public class CacheManager {
*
* @return 数据值
*/
public <T> CompletableFuture<T> remoteGetAsync(final String map, final String key, final Type type) {
return getAsync(remoteSource, map, key, type);
}
/**
* 远程获取字符串缓存数据, 过期返回null
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 数据值
*/
public final String remoteGetString(final String map, final String key) {
return get(remoteSource, map, key, String.class);
}
public <T> CompletableFuture<T> remoteGetAsync(final String map, final String key, final Type type);
/**
* 远程异步获取字符串缓存数据, 过期返回null
@@ -161,8 +123,8 @@ public class CacheManager {
*
* @return 数据值
*/
public final CompletableFuture<String> remoteGetStringAsync(final String map, final String key) {
return getAsync(remoteSource, map, key, String.class);
default CompletableFuture<String> remoteGetStringAsync(final String map, final String key) {
return remoteGetAsync(map, key, String.class);
}
/**
@@ -175,8 +137,18 @@ public class CacheManager {
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
public <T> void remoteSet(final String map, final String key, final Type type, final T value, Duration expire) {
set(remoteSource, map, key, type, value, expire);
public <T> void remoteSet(final String map, final String key, final Type type, final T value, Duration expire);
/**
* 远程缓存字符串数据
*
* @param map 缓存hash
* @param key 缓存键
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
default void remoteSetString(final String map, final String key, final String value, Duration expire) {
remoteSet(map, key, String.class, value, expire);
}
/**
@@ -189,21 +161,7 @@ public class CacheManager {
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
public <T> CompletableFuture<Void> remoteSetAsync(final String map, final String key, final Type type, final T value, Duration expire) {
return setAsync(remoteSource, map, key, type, value, expire);
}
/**
* 远程缓存字符串数据
*
* @param map 缓存hash
* @param key 缓存键
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
public void remoteSetString(final String map, final String key, final String value, Duration expire) {
set(remoteSource, map, key, String.class, value, expire);
}
public <T> CompletableFuture<Void> remoteSetAsync(String map, String key, Type type, T value, Duration expire);
/**
* 远程异步缓存字符串数据
@@ -213,8 +171,8 @@ public class CacheManager {
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
public CompletableFuture<Void> remoteSetStringAsync(final String map, final String key, final String value, Duration expire) {
return setAsync(remoteSource, map, key, String.class, value, expire);
default CompletableFuture<Void> remoteSetStringAsync(final String map, final String key, final String value, Duration expire) {
return remoteSetAsync(map, key, String.class, value, expire);
}
/**
@@ -225,9 +183,7 @@ public class CacheManager {
*
* @return 删除数量
*/
public long remoteDel(String map, String key) {
return del(remoteSource, map, key);
}
public long remoteDel(String map, String key);
/**
* 远程异步删除缓存数据
@@ -237,9 +193,7 @@ public class CacheManager {
*
* @return 删除数量
*/
public CompletableFuture<Long> remoteDelAsync(String map, String key) {
return delAsync(remoteSource, map, key);
}
public CompletableFuture<Long> remoteDelAsync(String map, String key);
//-------------------------------------- both缓存 --------------------------------------
/**
@@ -252,9 +206,18 @@ public class CacheManager {
*
* @return 数据值
*/
public <T> T bothGet(final String map, final String key, final Type type) {
T val = get(localSource, map, key, type);
return val == null ? get(remoteSource, map, key, type) : val;
public <T> T bothGet(final String map, final String key, final Type type);
/**
* 远程获取字符串缓存数据, 过期返回null
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 数据值
*/
default String bothGetString(final String map, final String key) {
return bothGet(map, key, String.class);
}
/**
@@ -267,25 +230,7 @@ public class CacheManager {
*
* @return 数据值
*/
public <T> CompletableFuture<T> bothGetAsync(final String map, final String key, final Type type) {
T val = get(localSource, map, key, type);
if (val != null) {
return CompletableFuture.completedFuture(val);
}
return getAsync(remoteSource, map, key, type);
}
/**
* 远程获取字符串缓存数据, 过期返回null
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 数据值
*/
public final String bothGetString(final String map, final String key) {
return bothGet(map, key, String.class);
}
public <T> CompletableFuture<T> bothGetAsync(final String map, final String key, final Type type);
/**
* 远程异步获取字符串缓存数据, 过期返回null
@@ -295,7 +240,7 @@ public class CacheManager {
*
* @return 数据值
*/
public final CompletableFuture<String> bothGetStringAsync(final String map, final String key) {
default CompletableFuture<String> bothGetStringAsync(final String map, final String key) {
return bothGetAsync(map, key, String.class);
}
@@ -310,9 +255,19 @@ public class CacheManager {
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public <T> void bothSet(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) {
set(localSource, map, key, type, value, localExpire);
set(remoteSource, map, key, type, value, remoteExpire);
public <T> void bothSet(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire);
/**
* 远程缓存字符串数据
*
* @param map 缓存hash
* @param key 缓存键
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
default void bothSetString(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) {
bothSet(map, key, String.class, value, localExpire, remoteExpire);
}
/**
@@ -326,23 +281,7 @@ public class CacheManager {
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public <T> CompletableFuture<Void> bothSetAsync(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) {
set(localSource, map, key, type, value, localExpire);
return setAsync(remoteSource, map, key, type, value, remoteExpire);
}
/**
* 远程缓存字符串数据
*
* @param map 缓存hash
* @param key 缓存键
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public void bothSetString(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) {
bothSet(map, key, String.class, value, localExpire, remoteExpire);
}
public <T> CompletableFuture<Void> bothSetAsync(String map, String key, Type type, T value, Duration localExpire, Duration remoteExpire);
/**
* 远程异步缓存字符串数据
@@ -353,7 +292,7 @@ public class CacheManager {
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public CompletableFuture<Void> bothSetStringAsync(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) {
default CompletableFuture<Void> bothSetStringAsync(String map, String key, String value, Duration localExpire, Duration remoteExpire) {
return bothSetAsync(map, key, String.class, value, localExpire, remoteExpire);
}
@@ -365,10 +304,7 @@ public class CacheManager {
*
* @return 删除数量
*/
public long bothDel(String map, String key) {
del(localSource, map, key);
return del(remoteSource, map, key);
}
public long bothDel(String map, String key);
/**
* 远程异步删除缓存数据
@@ -378,124 +314,6 @@ public class CacheManager {
*
* @return 删除数量
*/
public CompletableFuture<Long> bothDelAsync(String map, String key) {
del(localSource, map, key);
return delAsync(remoteSource, map, key);
}
public CompletableFuture<Long> bothDelAsync(String map, String key);
//-------------------------------------- 内部方法 --------------------------------------
/**
* 获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
protected <T> T get(final CacheSource source, final String map, final String key, final Type type) {
CacheValue<T> val = source.hget(map, key, loadCacheType(type));
return val != null && !val.isExpired() ? val.getValue() : null;
}
/**
* 获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
protected <T> CompletableFuture<T> getAsync(final CacheSource source, final String map, final String key, final Type type) {
return source.hgetAsync(map, key, loadCacheType(type)).thenApply(v -> {
CacheValue<T> val = (CacheValue) v;
return val != null && !val.isExpired() ? (T) val.getValue() : null;
});
}
/**
* 缓存数据
*
* @param <T> 泛型
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
protected <T> void set(final CacheSource source, final String map, final String key, final Type type, final T value, Duration expire) {
Type t = loadCacheType(type, value);
source.hset(map, key, t, CacheValue.create(value, expire));
}
/**
* 缓存数据
*
* @param <T> 泛型
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
protected <T> CompletableFuture<Void> setAsync(final CacheSource source, final String map, final String key, final Type type, final T value, Duration expire) {
Type t = loadCacheType(type, value);
return source.hsetAsync(map, key, t, CacheValue.create(value, expire));
}
/**
* 删除缓存数据
*
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
protected long del(final CacheSource source, String map, String key) {
return source.hdel(map, key);
}
/**
* 删除缓存数据
*
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
protected CompletableFuture<Long> delAsync(final CacheSource source, String map, String key) {
return source.hdelAsync(map, key);
}
/**
* 创建数据类型创建对应CacheValue泛型
*
* @param type 数据类型为null则取value的类型
* @param value 数据值
*
* @return CacheValue泛型
*/
protected Type loadCacheType(Type type, final Object value) {
return loadCacheType(type == null ? value.getClass() : type);
}
/**
* 创建数据类型创建对应CacheValue泛型
*
* @param type 数据类型
*
* @return CacheValue泛型
*/
protected Type loadCacheType(Type type) {
return cacheValueTypes.computeIfAbsent(type, t -> TypeToken.createParameterizedType(null, CacheValue.class, type));
}
}

View File

@@ -0,0 +1,457 @@
/*
*
*/
package org.redkale.caching;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component;
import org.redkale.annotation.Nonnull;
import org.redkale.annotation.Nullable;
import org.redkale.annotation.ResourceType;
import org.redkale.service.Local;
import org.redkale.service.Service;
import org.redkale.source.CacheMemorySource;
import org.redkale.source.CacheSource;
import org.redkale.util.AnyValue;
import org.redkale.util.TypeToken;
/**
*
* @author zhangjx
*/
@Local
@Component
@AutoLoad(false)
@ResourceType(CacheManager.class)
public class CacheManagerService implements CacheManager, Service {
//缓存配置项
protected final CacheConfig config;
//数据类型与CacheValue泛型的对应关系
private final ConcurrentHashMap<Type, Type> cacheValueTypes = new ConcurrentHashMap<>();
//本地缓存Source
protected final CacheMemorySource localSource = new CacheMemorySource("caching");
//远程缓存Source
protected CacheSource remoteSource;
protected CacheManagerService(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) {
this.config = Objects.requireNonNull(config);
this.remoteSource = remoteSource;
}
public static CacheManagerService create(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) {
return new CacheManagerService(config, remoteSource);
}
@Override
public void init(AnyValue conf) {
this.localSource.init(conf);
}
@Override
public void destroy(AnyValue conf) {
this.localSource.destroy(conf);
}
//-------------------------------------- 本地缓存 --------------------------------------
/**
* 本地获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
@Override
public <T> T localGet(final String map, final String key, final Type type) {
return get(localSource, map, key, type);
}
/**
* 本地缓存数据
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
@Override
public <T> void localSet(String map, String key, Type type, T value, Duration expire) {
set(localSource, map, key, type, value, expire);
}
/**
* 本地删除缓存数据
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
@Override
public long localDel(String map, String key) {
return del(localSource, map, key);
}
//-------------------------------------- 远程缓存 --------------------------------------
/**
* 远程获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
@Override
public <T> T remoteGet(final String map, final String key, final Type type) {
return get(remoteSource, map, key, type);
}
/**
* 远程异步获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
@Override
public <T> CompletableFuture<T> remoteGetAsync(final String map, final String key, final Type type) {
return getAsync(remoteSource, map, key, type);
}
/**
* 远程缓存数据
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
public <T> void remoteSet(final String map, final String key, final Type type, final T value, Duration expire) {
set(remoteSource, map, key, type, value, expire);
}
/**
* 远程异步缓存数据
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
public <T> CompletableFuture<Void> remoteSetAsync(String map, String key, Type type, T value, Duration expire) {
return setAsync(remoteSource, map, key, type, value, expire);
}
/**
* 远程删除缓存数据
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
public long remoteDel(String map, String key) {
return del(remoteSource, map, key);
}
/**
* 远程异步删除缓存数据
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
public CompletableFuture<Long> remoteDelAsync(String map, String key) {
return delAsync(remoteSource, map, key);
}
//-------------------------------------- both缓存 --------------------------------------
/**
* 远程获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
public <T> T bothGet(final String map, final String key, final Type type) {
T val = get(localSource, map, key, type);
return val == null ? get(remoteSource, map, key, type) : val;
}
/**
* 远程异步获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
public <T> CompletableFuture<T> bothGetAsync(final String map, final String key, final Type type) {
T val = get(localSource, map, key, type);
if (val != null) {
return CompletableFuture.completedFuture(val);
}
return getAsync(remoteSource, map, key, type);
}
/**
* 远程获取字符串缓存数据, 过期返回null
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 数据值
*/
public final String bothGetString(final String map, final String key) {
return bothGet(map, key, String.class);
}
/**
* 远程异步获取字符串缓存数据, 过期返回null
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 数据值
*/
public final CompletableFuture<String> bothGetStringAsync(final String map, final String key) {
return bothGetAsync(map, key, String.class);
}
/**
* 远程缓存数据
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public <T> void bothSet(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) {
set(localSource, map, key, type, value, localExpire);
set(remoteSource, map, key, type, value, remoteExpire);
}
/**
* 远程异步缓存数据
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public <T> CompletableFuture<Void> bothSetAsync(String map, String key, Type type, T value, Duration localExpire, Duration remoteExpire) {
set(localSource, map, key, type, value, localExpire);
return setAsync(remoteSource, map, key, type, value, remoteExpire);
}
/**
* 远程缓存字符串数据
*
* @param map 缓存hash
* @param key 缓存键
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public void bothSetString(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) {
bothSet(map, key, String.class, value, localExpire, remoteExpire);
}
/**
* 远程异步缓存字符串数据
*
* @param map 缓存hash
* @param key 缓存键
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public CompletableFuture<Void> bothSetStringAsync(String map, String key, String value, Duration localExpire, Duration remoteExpire) {
return bothSetAsync(map, key, String.class, value, localExpire, remoteExpire);
}
/**
* 远程删除缓存数据
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
public long bothDel(String map, String key) {
del(localSource, map, key);
return del(remoteSource, map, key);
}
/**
* 远程异步删除缓存数据
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
public CompletableFuture<Long> bothDelAsync(String map, String key) {
del(localSource, map, key);
return delAsync(remoteSource, map, key);
}
//-------------------------------------- 内部方法 --------------------------------------
/**
* 获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
protected <T> T get(final CacheSource source, final String map, final String key, final Type type) {
CacheValue<T> val = source.hget(map, key, loadCacheType(type));
return val != null && !val.isExpired() ? val.getValue() : null;
}
/**
* 获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
protected <T> CompletableFuture<T> getAsync(final CacheSource source, final String map, final String key, final Type type) {
if (source == null) {
return CompletableFuture.failedFuture(new NullPointerException(CacheManager.class.getSimpleName() + ".source is null"));
}
return source.hgetAsync(map, key, loadCacheType(type)).thenApply(v -> {
CacheValue<T> val = (CacheValue) v;
return val != null && !val.isExpired() ? (T) val.getValue() : null;
});
}
/**
* 缓存数据
*
* @param <T> 泛型
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
protected <T> void set(CacheSource source, String map, String key, Type type, T value, Duration expire) {
Type t = loadCacheType(type, value);
source.hset(map, key, t, CacheValue.create(value, expire));
}
/**
* 缓存数据
*
* @param <T> 泛型
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param expire 过期时长为null表示永不过期
*/
protected <T> CompletableFuture<Void> setAsync(CacheSource source, String map, String key, Type type, T value, Duration expire) {
if (source == null) {
return CompletableFuture.failedFuture(new NullPointerException(CacheManager.class.getSimpleName() + ".source is null"));
}
Type t = loadCacheType(type, value);
return source.hsetAsync(map, key, t, CacheValue.create(value, expire));
}
/**
* 删除缓存数据
*
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
protected long del(final CacheSource source, String map, String key) {
return source.hdel(map, key);
}
/**
* 删除缓存数据
*
* @param source 缓存源
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
protected CompletableFuture<Long> delAsync(final CacheSource source, String map, String key) {
if (source == null) {
return CompletableFuture.failedFuture(new NullPointerException(CacheManager.class.getSimpleName() + ".source is null"));
}
return source.hdelAsync(map, key);
}
/**
* 创建数据类型创建对应CacheValue泛型
*
* @param type 数据类型为null则取value的类型
* @param value 数据值
*
* @return CacheValue泛型
*/
protected Type loadCacheType(Type type, final Object value) {
return loadCacheType(type == null ? value.getClass() : type);
}
/**
* 创建数据类型创建对应CacheValue泛型
*
* @param type 数据类型
*
* @return CacheValue泛型
*/
protected Type loadCacheType(Type type) {
return cacheValueTypes.computeIfAbsent(type, t -> TypeToken.createParameterizedType(null, CacheValue.class, type));
}
}

View File

@@ -344,7 +344,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
entry = new CacheEntry(CacheEntryType.OBJECT, key);
container.put(key, entry);
entry.setObjectValue(convert, type, value);
entry.setObjectValue(convert == null ? this.convert : convert, type, value);
entry.expireSeconds(expireSeconds);
entry.lastAccessed = System.currentTimeMillis();
return true;
@@ -365,7 +365,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> T getSet(String key, Convert convert, Type type, T value) {
CacheEntry entry = find(key, CacheEntryType.OBJECT);
T old = entry == null ? null : (T) entry.getObjectValue(convert, type);
T old = entry == null ? null : (T) entry.getObjectValue(convert == null ? this.convert : convert, type);
set0(key, 0, convert, type, value);
return old;
}
@@ -387,7 +387,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
} finally {
containerLock.unlock();
}
return entry.getObjectValue(null, type);
return entry.getObjectValue(convert, type);
}
@Override
@@ -411,7 +411,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.setObjectValue(convert, type, value);
entry.setObjectValue(convert == null ? this.convert : convert, type, value);
entry.expireSeconds(expireSeconds);
entry.lastAccessed = System.currentTimeMillis();
} finally {
@@ -577,7 +577,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
if (entry.cacheType != CacheEntryType.ATOMIC) {
entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(null, String.class)));
entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(convert, String.class)));
entry.cacheType = CacheEntryType.ATOMIC;
}
return ((AtomicLong) entry.objectValue).addAndGet(num);
@@ -610,7 +610,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
if (entry.cacheType != CacheEntryType.DOUBLE) {
entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(null, String.class)));
entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(convert, String.class)));
entry.cacheType = CacheEntryType.DOUBLE;
}
Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num));
@@ -698,24 +698,25 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (expireSeconds > 0) {
entry.expireSeconds(expireSeconds);
}
final Convert c = convert == null ? this.convert : convert;
// OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP;
switch (entry.cacheType) {
case ATOMIC:
return CacheEntry.serialToObj(convert, type, (AtomicLong) entry.objectValue);
return CacheEntry.serialToObj(c, type, (AtomicLong) entry.objectValue);
case DOUBLE:
return CacheEntry.serialToObj(convert, type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue()));
return CacheEntry.serialToObj(c, type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue()));
case SSET:
return (T) entry.ssetValue.stream().map(v -> CacheEntry.serialToObj(convert, type, v)).collect(Collectors.toSet());
return (T) entry.ssetValue.stream().map(v -> CacheEntry.serialToObj(c, type, v)).collect(Collectors.toSet());
case ZSET:
return (T) entry.zsetValue.stream().map(v -> new CacheScoredValue(v)).collect(Collectors.toSet());
case LIST:
return (T) entry.listValue.stream().map(v -> CacheEntry.serialToObj(convert, type, v)).collect(Collectors.toList());
return (T) entry.listValue.stream().map(v -> CacheEntry.serialToObj(c, type, v)).collect(Collectors.toList());
case MAP:
LinkedHashMap<String, Object> map = new LinkedHashMap();
entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(convert, type, v)));
entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(c, type, v)));
return (T) map;
default:
return entry.getObjectValue(convert, type);
return entry.getObjectValue(c, type);
}
}
@@ -803,7 +804,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
Map map = entry.mapValue;
Serializable val = (Serializable) map.computeIfAbsent(field, f -> new AtomicLong());
if (!(val instanceof AtomicLong)) {
val = CacheEntry.objToSerial(null, AtomicLong.class, val);
val = CacheEntry.objToSerial(convert, AtomicLong.class, val);
map.put(field, val);
}
return ((AtomicLong) val).addAndGet(num);
@@ -889,7 +890,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
boolean rs = entry.setMapValueIfAbsent(field, convert, type, value) == null;
boolean rs = entry.setMapValueIfAbsent(field, convert == null ? this.convert : convert, type, value) == null;
entry.lastAccessed = System.currentTimeMillis();
return rs;
} finally {
@@ -932,7 +933,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
List<T> rs = new ArrayList<>(fields.length);
for (String field : fields) {
rs.add(entry.getMapValue(field, null, type));
rs.add(entry.getMapValue(field, convert, type));
}
return rs;
}
@@ -949,7 +950,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return new LinkedHashMap();
} else {
Map map = new LinkedHashMap();
entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(null, type, v)));
entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(convert, type, v)));
return map;
}
}
@@ -965,7 +966,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return new ArrayList();
} else {
Stream<T> stream = entry.mapValue.values().stream().map(v -> CacheEntry.serialToObj(null, type, v));
Stream<T> stream = entry.mapValue.values().stream().map(v -> CacheEntry.serialToObj(convert, type, v));
return new ArrayList(stream.collect(Collectors.toList()));
}
}
@@ -987,12 +988,12 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (Utility.isEmpty(pattern)) {
Set<Map.Entry<String, Serializable>> set = entry.mapValue.entrySet();
return set.stream()
.collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(null, type, en.getValue())));
.collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(convert, type, en.getValue())));
} else {
Predicate<String> regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate();
Set<Map.Entry<String, Serializable>> set = entry.mapValue.entrySet();
return set.stream().filter(en -> regx.test(en.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(null, type, en.getValue())));
.collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(convert, type, en.getValue())));
}
}
@@ -1028,7 +1029,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return 0L;
}
String obj = entry.getMapValue(field, null, String.class);
String obj = entry.getMapValue(field, convert, String.class);
return obj == null ? 0L : (long) obj.length();
}
@@ -1056,7 +1057,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.setMapValue(field, convert, type, value);
entry.setMapValue(field, convert == null ? this.convert : convert, type, value);
entry.lastAccessed = System.currentTimeMillis();
} finally {
entry.unlock();
@@ -1110,7 +1111,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
List<Serializable> list = new ArrayList(entry.listValue);
int pos = index >= 0 ? index : list.size() + index;
return pos >= list.size() ? null : CacheEntry.serialToObj(null, componentType, list.get(pos));
return pos >= list.size() ? null : CacheEntry.serialToObj(convert, componentType, list.get(pos));
}
@Override
@@ -1144,7 +1145,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return 0L;
}
entry.lock();
Serializable val = CacheEntry.objToSerial(null, componentType, value);
Serializable val = CacheEntry.objToSerial(convert, componentType, value);
try {
List<Serializable> list = new ArrayList<>(entry.listValue);
int pos = list.indexOf(pivot);
@@ -1197,7 +1198,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
for (T val : values) {
entry.listValue.addFirst(CacheEntry.objToSerial(null, componentType, val));
entry.listValue.addFirst(CacheEntry.objToSerial(convert, componentType, val));
}
} finally {
entry.unlock();
@@ -1219,7 +1220,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
try {
ConcurrentLinkedDeque list = entry.listValue;
for (T val : values) {
list.addFirst(CacheEntry.objToSerial(null, componentType, val));
list.addFirst(CacheEntry.objToSerial(convert, componentType, val));
}
} finally {
entry.unlock();
@@ -1239,7 +1240,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
return CacheEntry.serialToObj(null, componentType, entry.listValue.pollFirst());
return CacheEntry.serialToObj(convert, componentType, entry.listValue.pollFirst());
} finally {
entry.unlock();
}
@@ -1300,7 +1301,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
return CacheEntry.serialToObj(null, componentType, entry.listValue.pollLast());
return CacheEntry.serialToObj(convert, componentType, entry.listValue.pollLast());
} finally {
entry.unlock();
}
@@ -1321,7 +1322,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
try {
ConcurrentLinkedDeque<Serializable> list = entry.listValue;
for (T val : values) {
list.add(CacheEntry.objToSerial(null, componentType, val));
list.add(CacheEntry.objToSerial(convert, componentType, val));
}
} finally {
entry.unlock();
@@ -1352,7 +1353,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
try {
ConcurrentLinkedDeque<Serializable> list = entry.listValue;
for (T val : values) {
list.add(CacheEntry.objToSerial(null, componentType, val));
list.add(CacheEntry.objToSerial(convert, componentType, val));
}
} finally {
entry.unlock();
@@ -1396,15 +1397,15 @@ public final class CacheMemorySource extends AbstractCacheSource {
for (int i = 0; i < Math.abs(count); i++) {
int index = ThreadLocalRandom.current().nextInt(vals.size());
Serializable val = vals.get(index);
list.add(CacheEntry.serialToObj(null, componentType, val));
list.add(CacheEntry.serialToObj(convert, componentType, val));
}
} else { //不可以重复
if (count >= vals.size()) {
return vals.stream()
.map(val -> (T) CacheEntry.serialToObj(null, componentType, val)).collect(Collectors.toList());
.map(val -> (T) CacheEntry.serialToObj(convert, componentType, val)).collect(Collectors.toList());
}
return vals.subList(0, count).stream()
.map(val -> (T) CacheEntry.serialToObj(null, componentType, val)).collect(Collectors.toList());
.map(val -> (T) CacheEntry.serialToObj(convert, componentType, val)).collect(Collectors.toList());
}
return list;
}
@@ -1423,7 +1424,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
boolean rs = false;
entry.lock();
try {
Serializable val = CacheEntry.objToSerial(null, componentType, member);
Serializable val = CacheEntry.objToSerial(convert, componentType, member);
rs = entry.ssetValue.remove(val);
} finally {
entry.unlock();
@@ -1444,7 +1445,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry2.lock();
try {
entry2.addSsetValue(null, componentType, member);
entry2.addSsetValue(convert, componentType, member);
} finally {
entry2.unlock();
}
@@ -1460,7 +1461,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> Set<T> sdiff(final String key, final Type componentType, final String... key2s) {
return sdiff0(key, key2s).stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.map(v -> (T) CacheEntry.serialToObj(convert, componentType, v))
.collect(Collectors.toSet());
}
@@ -1519,7 +1520,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> Set<T> sinter(final String key, final Type componentType, final String... key2s) {
return sinter0(key, key2s).stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.map(v -> (T) CacheEntry.serialToObj(convert, componentType, v))
.collect(Collectors.toSet());
}
@@ -1587,7 +1588,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> Set<T> sunion(final String key, final Type componentType, final String... key2s) {
return sunion0(key, key2s).stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.map(v -> (T) CacheEntry.serialToObj(convert, componentType, v))
.collect(Collectors.toSet());
}
@@ -1650,7 +1651,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return new LinkedHashSet<>();
}
return entry.ssetValue.stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.map(v -> (T) CacheEntry.serialToObj(convert, componentType, v))
.collect(Collectors.toSet());
}
@@ -1666,7 +1667,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry != null) {
map.put(key, entry.ssetValue.stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.map(v -> (T) CacheEntry.serialToObj(convert, componentType, v))
.collect(Collectors.toSet()));
}
}
@@ -1718,7 +1719,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
for (T val : values) {
entry.addSsetValue(null, componentType, val);
entry.addSsetValue(convert, componentType, val);
}
} finally {
entry.unlock();
@@ -1744,7 +1745,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> boolean sismember(final String key, final Type type, final T value) {
CacheEntry entry = find(key, CacheEntryType.SSET);
return entry != null && entry.ssetValue.contains(CacheEntry.objToSerial(null, type, value));
return entry != null && entry.ssetValue.contains(CacheEntry.objToSerial(convert, type, value));
}
@Override
@@ -1771,7 +1772,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
if (del != null) {
cset.remove(del);
return CacheEntry.serialToObj(null, componentType, del);
return CacheEntry.serialToObj(convert, componentType, del);
}
return null;
} finally {
@@ -1803,7 +1804,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
while (it.hasNext()) {
Serializable item = it.next();
rms.add(item);
list.add(CacheEntry.serialToObj(null, componentType, item));
list.add(CacheEntry.serialToObj(convert, componentType, item));
if (++index >= count) {
break;
}
@@ -1835,7 +1836,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
Iterator<Serializable> it = cset.iterator();
Set<T> list = new LinkedHashSet<>();
while (it.hasNext()) {
list.add(CacheEntry.serialToObj(null, componentType, it.next()));
list.add(CacheEntry.serialToObj(convert, componentType, it.next()));
}
return list;
} finally {
@@ -1856,7 +1857,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
long count = 0;
for (T val : values) {
count += entry.ssetValue.remove(CacheEntry.objToSerial(null, type, val)) ? 1 : 0;
count += entry.ssetValue.remove(CacheEntry.objToSerial(convert, type, val)) ? 1 : 0;
}
return count;
}
@@ -2248,7 +2249,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP;
}
//类型只能是: String、byte[]、AtomicLong
//Serializable的具体数据类型只能是: String、byte[]、AtomicLong
public static final class CacheEntry {
volatile long lastAccessed; //最后刷新时间
@@ -2303,36 +2304,34 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
//value类型只能是byte[]/String/AtomicLong
public static <T> T serialToObj(Convert convert, @Nonnull Type type, Serializable value) {
public static <T> T serialToObj(@Nonnull Convert convert, @Nonnull Type type, Serializable value) {
if (value == null) {
return null;
}
Convert c = convert == null ? JsonConvert.root() : convert;
if (value.getClass() == byte[].class) {
return (T) c.convertFrom(type, (byte[]) value);
return (T) convert.convertFrom(type, (byte[]) value);
} else { //String/AtomicLong
if (c instanceof TextConvert) {
return (T) ((TextConvert) c).convertFrom(type, value.toString());
if (convert instanceof TextConvert) {
return (T) ((TextConvert) convert).convertFrom(type, value.toString());
} else {
return (T) c.convertFrom(type, value.toString().getBytes(StandardCharsets.UTF_8));
return (T) convert.convertFrom(type, value.toString().getBytes(StandardCharsets.UTF_8));
}
}
}
//返回类型只能是byte[]/String/AtomicLong
public static Serializable objToSerial(Convert convert, Type type, Object value) {
public static Serializable objToSerial(@Nonnull Convert convert, Type type, Object value) {
if (value == null) {
return null;
}
if (value instanceof String || value instanceof byte[]) {
return (Serializable) value;
}
Convert c = convert == null ? JsonConvert.root() : convert;
Type t = type == null ? value.getClass() : type;
if (c instanceof TextConvert) {
return ((TextConvert) c).convertTo(t, value);
if (convert instanceof TextConvert) {
return ((TextConvert) convert).convertTo(t, value);
} else {
return c.convertToBytes(t, value);
return convert.convertToBytes(t, value);
}
}

View File

@@ -7,7 +7,7 @@ import java.time.Duration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redkale.caching.CacheConfig;
import org.redkale.caching.CacheManager;
import org.redkale.caching.CacheManagerService;
import org.redkale.convert.json.JsonConvert;
import org.redkale.source.CacheMemorySource;
import org.redkale.util.Utility;
@@ -25,7 +25,10 @@ public class CachingTest {
@Test
public void run() throws Exception {
CacheManager cache = CacheManager.create(new CacheConfig(), new CacheMemorySource("remote"));
CacheMemorySource remoteSource = new CacheMemorySource("remote");
remoteSource.init(null);
CacheManagerService cache = CacheManagerService.create(new CacheConfig(), remoteSource);
cache.init(null);
Duration expire = Duration.ofMillis(490);
cache.localSetString("user", "name:haha", "myha", expire);
Assertions.assertEquals(cache.localGetString("user", "name:haha"), "myha");
@@ -41,6 +44,7 @@ public class CachingTest {
Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json);
bean.setRemark(bean.getRemark() + "-新备注");
Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json);
cache.destroy(null);
}
public static class CachingBean {