CacheMemorySource优化

This commit is contained in:
redkale
2023-12-11 14:45:35 +08:00
parent 8e51f99c7a
commit bd088ab9ec
3 changed files with 420 additions and 146 deletions

View File

@@ -241,6 +241,148 @@ public class CacheManager {
return delAsync(remoteSource, map, key);
}
//-------------------------------------- both缓存 --------------------------------------
/**
* 远程获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
public <T> T bothGet(final String map, final String key, final Type type) {
T val = get(localSource, map, key, type);
return val == null ? get(remoteSource, map, key, type) : val;
}
/**
* 远程异步获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
*
* @return 数据值
*/
public <T> CompletableFuture<T> bothGetAsync(final String map, final String key, final Type type) {
T val = get(localSource, map, key, type);
if (val != null) {
return CompletableFuture.completedFuture(val);
}
return getAsync(remoteSource, map, key, type);
}
/**
* 远程获取字符串缓存数据, 过期返回null
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 数据值
*/
public final String bothGetString(final String map, final String key) {
return bothGet(map, key, String.class);
}
/**
* 远程异步获取字符串缓存数据, 过期返回null
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 数据值
*/
public final CompletableFuture<String> bothGetStringAsync(final String map, final String key) {
return bothGetAsync(map, key, String.class);
}
/**
* 远程缓存数据
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public <T> void bothSet(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) {
set(localSource, map, key, type, value, localExpire);
set(remoteSource, map, key, type, value, remoteExpire);
}
/**
* 远程异步缓存数据
*
* @param <T> 泛型
* @param map 缓存hash
* @param key 缓存键
* @param type 数据类型
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public <T> CompletableFuture<Void> bothSetAsync(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) {
set(localSource, map, key, type, value, localExpire);
return setAsync(remoteSource, map, key, type, value, remoteExpire);
}
/**
* 远程缓存字符串数据
*
* @param map 缓存hash
* @param key 缓存键
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public void bothSetString(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) {
bothSet(map, key, String.class, value, localExpire, remoteExpire);
}
/**
* 远程异步缓存字符串数据
*
* @param map 缓存hash
* @param key 缓存键
* @param value 数据值
* @param localExpire 本地过期时长为null表示永不过期
* @param remoteExpire 远程过期时长为null表示永不过期
*/
public CompletableFuture<Void> bothSetStringAsync(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) {
return bothSetAsync(map, key, String.class, value, localExpire, remoteExpire);
}
/**
* 远程删除缓存数据
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
public long bothDel(String map, String key) {
del(localSource, map, key);
return del(remoteSource, map, key);
}
/**
* 远程异步删除缓存数据
*
* @param map 缓存hash
* @param key 缓存键
*
* @return 删除数量
*/
public CompletableFuture<Long> bothDelAsync(String map, String key) {
del(localSource, map, key);
return delAsync(remoteSource, map, key);
}
//-------------------------------------- 内部方法 --------------------------------------
/**
* 获取缓存数据, 过期返回null

View File

@@ -7,6 +7,7 @@ package org.redkale.source;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -24,11 +25,9 @@ import org.redkale.convert.*;
import org.redkale.convert.json.*;
import org.redkale.service.Local;
import org.redkale.util.*;
import static org.redkale.util.Utility.convertValue;
import static org.redkale.util.Utility.isEmpty;
/**
* CacheSource的默认实现--内存缓存, 此实现只可用于调试,不可用于生产环境
* CacheSource的默认实现--内存缓存
* 注意: url 需要指定为 memory:cachesource
*
* <p>
@@ -222,7 +221,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
//------------------------ 订阅发布 SUB/PUB ------------------------
@Override
public CompletableFuture<List<String>> pubsubChannelsAsync(@Nullable String pattern) {
Predicate<String> predicate = isEmpty(pattern) ? t -> true : Pattern.compile(pattern).asPredicate();
Predicate<String> predicate = Utility.isEmpty(pattern) ? t -> true : Pattern.compile(pattern).asPredicate();
return CompletableFuture.completedFuture(pubsubListeners.keySet().stream().filter(predicate).collect(Collectors.toList()));
}
@@ -296,7 +295,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
for (int i = 0; i < keyVals.length; i += 2) {
String key = keyVals[i].toString();
Object val = keyVals[i + 1];
set0(key.toString(), 0, null, val);
set0(key.toString(), 0, null, null, val);
}
}
@@ -307,7 +306,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public void mset(Map map) {
map.forEach((key, val) -> set0(key.toString(), 0, null, val));
map.forEach((key, val) -> set0(key.toString(), 0, null, null, val));
}
@Override
@@ -317,7 +316,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> void set(String key, Convert convert, Type type, T value) {
set0(key, 0, type, value);
set0(key, 0, convert, type, value);
}
@Override
@@ -345,7 +344,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
entry = new CacheEntry(CacheEntryType.OBJECT, key);
container.put(key, entry);
entry.objectValue = value;
entry.setObjectValue(convert, type, value);
entry.expireSeconds(expireSeconds);
entry.lastAccessed = System.currentTimeMillis();
return true;
@@ -366,9 +365,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> T getSet(String key, Convert convert, Type type, T value) {
CacheEntry entry = find(key, CacheEntryType.OBJECT);
T old = entry == null ? null : (T) entry.objectValue;
set0(key, 0, type, value);
return convertValue(type, old);
T old = entry == null ? null : (T) entry.getObjectValue(convert, type);
set0(key, 0, convert, type, value);
return old;
}
@Override
@@ -388,7 +387,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
} finally {
containerLock.unlock();
}
return convertValue(type, entry.objectValue);
return entry.getObjectValue(null, type);
}
@Override
@@ -396,7 +395,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyFuture(() -> getDel(key, type));
}
private void set0(String key, int expireSeconds, Type type, Object value) {
private void set0(String key, int expireSeconds, Convert convert, Type type, Object value) {
CacheEntry entry = find(key, CacheEntryType.OBJECT);
if (entry == null) {
containerLock.lock();
@@ -412,7 +411,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.objectValue = convertValue(type, value);
entry.setObjectValue(convert, type, value);
entry.expireSeconds(expireSeconds);
entry.lastAccessed = System.currentTimeMillis();
} finally {
@@ -422,7 +421,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> void setex(String key, int expireSeconds, Convert convert, Type type, T value) {
set0(key, expireSeconds, type, value);
set0(key, expireSeconds, convert, type, value);
}
@Override
@@ -578,7 +577,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
if (entry.cacheType != CacheEntryType.ATOMIC) {
entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString()));
entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(null, String.class)));
entry.cacheType = CacheEntryType.ATOMIC;
}
return ((AtomicLong) entry.objectValue).addAndGet(num);
@@ -611,7 +610,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
if (entry.cacheType != CacheEntryType.DOUBLE) {
entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString()));
entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(null, String.class)));
entry.cacheType = CacheEntryType.DOUBLE;
}
Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num));
@@ -650,7 +649,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
public <T> List<T> mget(final Type componentType, final String... keys) {
List<T> list = new ArrayList<>();
for (String key : keys) {
list.add(get0(key, 0, componentType));
list.add(get0(key, 0, null, componentType));
}
return list;
}
@@ -673,7 +672,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> T get(final String key, final Type type) {
return get0(key, 0, type);
return get0(key, 0, null, type);
}
@Override
@@ -683,7 +682,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> T getex(final String key, final int expireSeconds, final Type type) {
return get0(key, expireSeconds, type);
return get0(key, expireSeconds, null, type);
}
@Override
@@ -691,7 +690,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyFuture(() -> getex(key, expireSeconds, type));
}
private <T> T get0(final String key, final int expireSeconds, final Type type) {
private <T> T get0(final String key, final int expireSeconds, final Convert convert, final Type type) {
CacheEntry entry = find(key);
if (entry == null) {
return null;
@@ -702,23 +701,21 @@ public final class CacheMemorySource extends AbstractCacheSource {
// OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP;
switch (entry.cacheType) {
case ATOMIC:
return convertValue(type, (AtomicLong) entry.objectValue);
return CacheEntry.serialToObj(convert, type, (AtomicLong) entry.objectValue);
case DOUBLE:
return convertValue(type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue()));
return CacheEntry.serialToObj(convert, type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue()));
case SSET:
return (T) new LinkedHashSet(entry.setValue);
return (T) entry.ssetValue.stream().map(v -> CacheEntry.serialToObj(convert, type, v)).collect(Collectors.toSet());
case ZSET:
return (T) new LinkedHashSet(entry.setValue);
return (T) entry.zsetValue.stream().map(v -> new CacheScoredValue(v)).collect(Collectors.toSet());
case LIST:
return (T) new ArrayList(entry.listValue);
return (T) entry.listValue.stream().map(v -> CacheEntry.serialToObj(convert, type, v)).collect(Collectors.toList());
case MAP:
return (T) new LinkedHashMap<>(entry.mapValue);
LinkedHashMap<String, Object> map = new LinkedHashMap();
entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(convert, type, v)));
return (T) map;
default:
Object obj = entry.objectValue;
if (obj != null && obj.getClass() != type) {
return (T) JsonConvert.root().convertFrom(type, JsonConvert.root().convertToBytes(obj));
}
return (T) obj;
return entry.getObjectValue(convert, type);
}
}
@@ -806,7 +803,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
Map map = entry.mapValue;
Serializable val = (Serializable) map.computeIfAbsent(field, f -> new AtomicLong());
if (!(val instanceof AtomicLong)) {
val = new AtomicLong(((Number) val).longValue());
val = CacheEntry.objToSerial(null, AtomicLong.class, val);
map.put(field, val);
}
return ((AtomicLong) val).addAndGet(num);
@@ -864,7 +861,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
//需要给CacheFactory使用
@Override
public <T> void hset(final String key, final String field, final Convert convert, final Type type, final T value) {
hset0(key, field, type, value);
hset0(key, field, convert, type, value);
}
@Override
@@ -892,7 +889,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
boolean rs = entry.mapValue.putIfAbsent(field, convertValue(type, value)) == null;
boolean rs = entry.setMapValueIfAbsent(field, convert, type, value) == null;
entry.lastAccessed = System.currentTimeMillis();
return rs;
} finally {
@@ -908,7 +905,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public void hmset(final String key, final Serializable... values) {
for (int i = 0; i < values.length; i += 2) {
hset0(key, (String) values[i], null, values[i + 1]);
hset0(key, (String) values[i], null, null, values[i + 1]);
}
}
@@ -919,7 +916,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public void hmset(final String key, final Map map) {
map.forEach((k, v) -> hset0(key, (String) k, null, v));
map.forEach((k, v) -> hset0(key, (String) k, null, null, v));
}
@Override
@@ -933,10 +930,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return null;
}
Map map = entry.mapValue;
List<T> rs = new ArrayList<>(fields.length);
for (String field : fields) {
rs.add(convertValue(type, map.get(field)));
rs.add(entry.getMapValue(field, null, type));
}
return rs;
}
@@ -953,7 +949,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return new LinkedHashMap();
} else {
Map map = new LinkedHashMap();
entry.mapValue.forEach((k, v) -> map.put(k, convertValue(type, v)));
entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(null, type, v)));
return map;
}
}
@@ -969,7 +965,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return new ArrayList();
} else {
Stream<T> stream = entry.mapValue.values().stream().map(v -> convertValue(type, v));
Stream<T> stream = entry.mapValue.values().stream().map(v -> CacheEntry.serialToObj(null, type, v));
return new ArrayList(stream.collect(Collectors.toList()));
}
}
@@ -989,12 +985,14 @@ public final class CacheMemorySource extends AbstractCacheSource {
return new HashMap();
}
if (Utility.isEmpty(pattern)) {
return new LinkedHashMap(entry.mapValue);
Set<Map.Entry<String, Serializable>> set = entry.mapValue.entrySet();
return set.stream()
.collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(null, type, en.getValue())));
} else {
Predicate<String> regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate();
Set<Map.Entry<String, Serializable>> set = entry.mapValue.entrySet();
return set.stream().filter(en -> regx.test(en.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, en -> convertValue(type, en.getValue())));
.collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(null, type, en.getValue())));
}
}
@@ -1013,8 +1011,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return null;
}
Object obj = entry.mapValue.get(field);
return obj == null ? null : convertValue(type, obj);
return entry.getMapValue(field, convert, type);
}
@Override
@@ -1031,8 +1028,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return 0L;
}
Object obj = entry.mapValue.get(field);
return obj == null ? 0L : (long) obj.toString().length();
String obj = entry.getMapValue(field, null, String.class);
return obj == null ? 0L : (long) obj.length();
}
@Override
@@ -1040,7 +1037,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyFuture(() -> hstrlen(key, field));
}
private void hset0(String key, String field, Type type, Object value) {
private void hset0(String key, String field, Convert convert, Type type, Object value) {
if (value == null) {
return;
}
@@ -1059,7 +1056,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.mapValue.put(field, convertValue(type, value));
entry.setMapValue(field, convert, type, value);
entry.lastAccessed = System.currentTimeMillis();
} finally {
entry.unlock();
@@ -1111,9 +1108,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return null;
}
List list = new ArrayList(entry.listValue);
List<Serializable> list = new ArrayList(entry.listValue);
int pos = index >= 0 ? index : list.size() + index;
return pos >= list.size() ? null : convertValue(componentType, list.get(pos));
return pos >= list.size() ? null : CacheEntry.serialToObj(null, componentType, list.get(pos));
}
@Override
@@ -1147,29 +1144,30 @@ public final class CacheMemorySource extends AbstractCacheSource {
return 0L;
}
entry.lock();
Serializable val = CacheEntry.objToSerial(null, componentType, value);
try {
List<T> list = new ArrayList<>(entry.listValue);
List<Serializable> list = new ArrayList<>(entry.listValue);
int pos = list.indexOf(pivot);
if (pos < 0) {
return -1L;
}
List<T> newList = new ArrayList<>();
List<Serializable> newList = new ArrayList<>();
if (before) {
if (pos == 0) {
newList.add(value);
newList.add(val);
newList.addAll(list);
} else {
newList.addAll(list.subList(0, pos));
newList.add(value);
newList.add(val);
newList.addAll(list.subList(pos, list.size()));
}
} else {
if (pos == list.size() - 1) {
newList.addAll(list);
newList.add(value);
newList.add(val);
} else {
newList.addAll(list.subList(0, pos + 1));
newList.add(value);
newList.add(val);
newList.addAll(list.subList(pos + 1, list.size()));
}
}
@@ -1199,7 +1197,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
for (T val : values) {
entry.listValue.addFirst(val);
entry.listValue.addFirst(CacheEntry.objToSerial(null, componentType, val));
}
} finally {
entry.unlock();
@@ -1221,7 +1219,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
try {
ConcurrentLinkedDeque list = entry.listValue;
for (T val : values) {
list.addFirst(val);
list.addFirst(CacheEntry.objToSerial(null, componentType, val));
}
} finally {
entry.unlock();
@@ -1241,7 +1239,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
return convertValue(componentType, entry.listValue.pollFirst());
return CacheEntry.serialToObj(null, componentType, entry.listValue.pollFirst());
} finally {
entry.unlock();
}
@@ -1302,7 +1300,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
return convertValue(componentType, entry.listValue.pollLast());
return CacheEntry.serialToObj(null, componentType, entry.listValue.pollLast());
} finally {
entry.unlock();
}
@@ -1321,8 +1319,10 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
ConcurrentLinkedDeque list = entry.listValue;
list.addAll(List.of(values));
ConcurrentLinkedDeque<Serializable> list = entry.listValue;
for (T val : values) {
list.add(CacheEntry.objToSerial(null, componentType, val));
}
} finally {
entry.unlock();
}
@@ -1350,8 +1350,10 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
ConcurrentLinkedDeque list = entry.listValue;
list.addAll(List.of(values));
ConcurrentLinkedDeque<Serializable> list = entry.listValue;
for (T val : values) {
list.add(CacheEntry.objToSerial(null, componentType, val));
}
} finally {
entry.unlock();
}
@@ -1389,18 +1391,20 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return list;
}
List<T> vals = new ArrayList<>(entry.setValue);
List<Serializable> vals = new ArrayList<>(entry.ssetValue);
if (count < 0) { //可以重复
for (int i = 0; i < Math.abs(count); i++) {
int index = ThreadLocalRandom.current().nextInt(vals.size());
T val = vals.get(index);
list.add(convertValue(componentType, val));
Serializable val = vals.get(index);
list.add(CacheEntry.serialToObj(null, componentType, val));
}
} else { //不可以重复
if (count >= vals.size()) {
return vals;
return vals.stream()
.map(val -> (T) CacheEntry.serialToObj(null, componentType, val)).collect(Collectors.toList());
}
return vals.subList(0, count);
return vals.subList(0, count).stream()
.map(val -> (T) CacheEntry.serialToObj(null, componentType, val)).collect(Collectors.toList());
}
return list;
}
@@ -1419,7 +1423,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
boolean rs = false;
entry.lock();
try {
rs = entry.setValue.remove(member);
Serializable val = CacheEntry.objToSerial(null, componentType, member);
rs = entry.ssetValue.remove(val);
} finally {
entry.unlock();
}
@@ -1439,7 +1444,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry2.lock();
try {
entry2.setValue.add(member);
entry2.addSsetValue(null, componentType, member);
} finally {
entry2.unlock();
}
@@ -1454,7 +1459,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> Set<T> sdiff(final String key, final Type componentType, final String... key2s) {
return sdiff0(key, key2s);
return sdiff0(key, key2s).stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.collect(Collectors.toSet());
}
@Override
@@ -1464,7 +1471,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public long sdiffstore(final String key, final String srcKey, final String... srcKey2s) {
Set rs = sdiff0(srcKey, srcKey2s);
Set<Serializable> rs = sdiff0(srcKey, srcKey2s);
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry == null) {
containerLock.lock();
@@ -1480,8 +1487,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.setValue.clear();
entry.setValue.addAll(rs);
entry.ssetValue.clear();
entry.ssetValue.addAll(rs);
} finally {
entry.unlock();
}
@@ -1493,17 +1500,17 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyFuture(() -> sdiffstore(key, srcKey, srcKey2s));
}
private <T> Set<T> sdiff0(final String key, final String... key2s) {
Set<T> rs = new HashSet<>();
private Set<Serializable> sdiff0(final String key, final String... key2s) {
Set<Serializable> rs = new HashSet<>();
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry == null) {
return rs;
}
rs.addAll(entry.setValue);
rs.addAll(entry.ssetValue);
for (String k : key2s) {
CacheEntry en2 = find(k, CacheEntryType.SSET);
if (en2 != null) {
en2.setValue.forEach(rs::remove);
en2.ssetValue.forEach(rs::remove);
}
}
return rs;
@@ -1511,7 +1518,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> Set<T> sinter(final String key, final Type componentType, final String... key2s) {
return sinter0(key, key2s);
return sinter0(key, key2s).stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.collect(Collectors.toSet());
}
@Override
@@ -1521,7 +1530,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public long sinterstore(final String key, final String srcKey, final String... srcKey2s) {
Set rs = sinter0(srcKey, srcKey2s);
Set<Serializable> rs = sinter0(srcKey, srcKey2s);
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry == null) {
containerLock.lock();
@@ -1537,8 +1546,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.setValue.clear();
entry.setValue.addAll(rs);
entry.ssetValue.clear();
entry.ssetValue.addAll(rs);
} finally {
entry.unlock();
}
@@ -1550,23 +1559,23 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyFuture(() -> sinterstore(key, srcKey, srcKey2s));
}
private <T> Set<T> sinter0(final String key, final String... key2s) {
Set<T> rs = new HashSet<>();
private Set<Serializable> sinter0(final String key, final String... key2s) {
Set<Serializable> rs = new HashSet<>();
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry == null) {
return rs;
}
rs.addAll(entry.setValue);
rs.addAll(entry.ssetValue);
for (String k : key2s) {
CacheEntry en2 = find(k, CacheEntryType.SSET);
if (en2 != null) {
Set<T> removes = new HashSet<>();
for (T v : rs) {
if (!en2.setValue.contains(v)) {
removes.add(v);
Set<Serializable> rms = new HashSet<>();
for (Serializable v : rs) {
if (!en2.ssetValue.contains(v)) {
rms.add(v);
}
}
rs.removeAll(removes);
rs.removeAll(rms);
} else {
rs.clear();
return rs;
@@ -1577,7 +1586,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> Set<T> sunion(final String key, final Type componentType, final String... key2s) {
return sunion0(key, key2s);
return sunion0(key, key2s).stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.collect(Collectors.toSet());
}
@Override
@@ -1587,7 +1598,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public long sunionstore(final String key, final String srcKey, final String... srcKey2s) {
Set rs = sunion0(srcKey, srcKey2s);
Set<Serializable> rs = sunion0(srcKey, srcKey2s);
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry == null) {
@@ -1604,8 +1615,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.setValue.clear();
entry.setValue.addAll(rs);
entry.ssetValue.clear();
entry.ssetValue.addAll(rs);
} finally {
entry.unlock();
}
@@ -1617,16 +1628,16 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyFuture(() -> sunionstore(key, srcKey, srcKey2s));
}
private <T> Set<T> sunion0(final String key, final String... key2s) {
Set<T> rs = new HashSet<>();
private Set<Serializable> sunion0(final String key, final String... key2s) {
Set<Serializable> rs = new HashSet<>();
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry != null) {
rs.addAll(entry.setValue);
rs.addAll(entry.ssetValue);
}
for (String k : key2s) {
CacheEntry en2 = find(k, CacheEntryType.SSET);
if (en2 != null) {
rs.addAll(en2.setValue);
rs.addAll(en2.ssetValue);
}
}
return rs;
@@ -1638,7 +1649,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return new LinkedHashSet<>();
}
return new LinkedHashSet<>(entry.setValue);
return entry.ssetValue.stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.collect(Collectors.toSet());
}
@Override
@@ -1652,7 +1665,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
for (String key : keys) {
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry != null) {
map.put(key, new LinkedHashSet<>(entry.setValue));
map.put(key, entry.ssetValue.stream()
.map(v -> (T) CacheEntry.serialToObj(null, componentType, v))
.collect(Collectors.toSet()));
}
}
return map;
@@ -1673,7 +1688,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
return rs;
}
Set set = entry.setValue;
Set<Serializable> set = entry.ssetValue;
for (String member : members) {
rs.add(set.contains(member));
}
@@ -1702,8 +1717,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
Set set = entry.setValue;
set.addAll(List.of(values));
for (T val : values) {
entry.addSsetValue(null, componentType, val);
}
} finally {
entry.unlock();
}
@@ -1717,7 +1733,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public long scard(final String key) {
CacheEntry entry = find(key, CacheEntryType.SSET);
return entry == null ? 0L : (long) entry.setValue.size();
return entry == null ? 0L : (long) entry.ssetValue.size();
}
@Override
@@ -1728,7 +1744,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> boolean sismember(final String key, final Type type, final T value) {
CacheEntry entry = find(key, CacheEntryType.SSET);
return entry != null && entry.setValue.contains(value);
return entry != null && entry.ssetValue.contains(CacheEntry.objToSerial(null, type, value));
}
@Override
@@ -1744,18 +1760,18 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
final Set cset = entry.setValue;
final Set<Serializable> cset = entry.ssetValue;
if (cset.isEmpty()) {
return null;
}
Iterator it = cset.iterator();
Object del = null;
Iterator<Serializable> it = cset.iterator();
Serializable del = null;
if (it.hasNext()) {
del = it.next();
}
if (del != null) {
cset.remove(del);
return (T) del;
return CacheEntry.serialToObj(null, componentType, del);
}
return null;
} finally {
@@ -1776,20 +1792,23 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
final Set cset = entry.setValue;
final Set<Serializable> cset = entry.ssetValue;
if (cset.isEmpty()) {
return new LinkedHashSet<>();
}
Iterator it = cset.iterator();
Iterator<Serializable> it = cset.iterator();
Set<T> list = new LinkedHashSet<>();
Set<Serializable> rms = new LinkedHashSet<>();
int index = 0;
while (it.hasNext()) {
list.add(convertValue(componentType, it.next()));
Serializable item = it.next();
rms.add(item);
list.add(CacheEntry.serialToObj(null, componentType, item));
if (++index >= count) {
break;
}
}
cset.removeAll(list);
cset.removeAll(rms);
return list;
} finally {
entry.unlock();
@@ -1809,14 +1828,14 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
final Set cset = entry.setValue;
final Set<Serializable> cset = entry.ssetValue;
if (cset.isEmpty()) {
return new LinkedHashSet<>();
}
Iterator it = cset.iterator();
Iterator<Serializable> it = cset.iterator();
Set<T> list = new LinkedHashSet<>();
while (it.hasNext()) {
list.add((T) convertValue(componentType, it.next()));
list.add(CacheEntry.serialToObj(null, componentType, it.next()));
}
return list;
} finally {
@@ -1835,7 +1854,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return 0L;
}
return entry.setValue.removeAll(Arrays.asList(values)) ? 1L : 0L;
long count = 0;
for (T val : values) {
count += entry.ssetValue.remove(CacheEntry.objToSerial(null, type, val)) ? 1 : 0;
}
return count;
}
@Override
@@ -1846,7 +1869,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
//------------------------ 有序集合 Sorted Set ------------------------
@Override
public void zadd(String key, CacheScoredValue... values) {
List<Object> list = new ArrayList<>();
List<CacheScoredValue> list = new ArrayList<>();
for (CacheScoredValue v : values) {
list.add(new CacheScoredValue(v));
}
@@ -1865,7 +1888,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.setValue.addAll(list);
entry.zsetValue.addAll(list);
} finally {
entry.unlock();
}
@@ -1893,7 +1916,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
Set<CacheScoredValue> sets = entry.setValue;
Set<CacheScoredValue> sets = entry.zsetValue;
CacheScoredValue old = sets.stream().filter(v -> Objects.equals(v.getValue(), value.getValue())).findAny().orElse(null);
if (old == null) {
sets.add(new CacheScoredValue(value.getScore().doubleValue(), value.getValue()));
@@ -1931,7 +1954,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return 0L;
}
return entry.setValue.size();
return entry.zsetValue.size();
}
@Override
@@ -1945,7 +1968,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return null;
}
List<CacheScoredValue> list = new ArrayList<>(entry.setValue);
List<CacheScoredValue> list = new ArrayList<>(entry.zsetValue);
Collections.sort(list);
long c = 0;
for (CacheScoredValue v : list) {
@@ -1968,7 +1991,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return null;
}
List<CacheScoredValue> list = new ArrayList<>(entry.setValue);
List<CacheScoredValue> list = new ArrayList<>(entry.zsetValue);
Collections.sort(list, Collections.reverseOrder());
long c = 0;
for (CacheScoredValue v : list) {
@@ -1992,7 +2015,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return new ArrayList<>();
}
List<String> list = new ArrayList<>();
Set<CacheScoredValue> sets = entry.setValue;
Set<CacheScoredValue> sets = entry.zsetValue;
long c = 0;
for (CacheScoredValue v : sets) {
if (c >= start && (stop < 0 || c <= stop)) {
@@ -2014,7 +2037,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return new ArrayList();
}
Set<CacheScoredValue> sets = entry.setValue;
Set<CacheScoredValue> sets = entry.zsetValue;
if (Utility.isEmpty(pattern)) {
return sets.stream().collect(Collectors.toList());
} else {
@@ -2034,7 +2057,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return 0L;
}
Set<CacheScoredValue> sets = entry.setValue;
Set<CacheScoredValue> sets = entry.zsetValue;
long c = 0;
Set<String> keys = Set.of(members);
Iterator<CacheScoredValue> it = sets.iterator();
@@ -2066,11 +2089,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
return list;
}
Set<String> keys = Set.of(members);
Set<CacheScoredValue> sets = entry.setValue;
Set<CacheScoredValue> sets = entry.zsetValue;
Map<String, T> map = new HashMap<>();
sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> {
map.put(v.getValue(), formatScore(scoreType, v.getScore()));
});
sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> map.put(v.getValue(), formatScore(scoreType, v.getScore())));
for (String m : members) {
list.add(map.get(m));
}
@@ -2105,8 +2126,12 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return null;
}
Set<CacheScoredValue> sets = entry.setValue;
return formatScore(scoreType, sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null));
Set<CacheScoredValue> sets = entry.zsetValue;
return formatScore(scoreType, sets.stream()
.filter(v -> Objects.equals(member, v.getValue()))
.findAny()
.map(v -> v.getScore())
.orElse(null));
}
@Override
@@ -2147,7 +2172,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public List<String> keys(String pattern) {
List<String> rs = new ArrayList<>();
Predicate<String> filter = isEmpty(pattern) ? x -> true : Pattern.compile(pattern).asPredicate();
Predicate<String> filter = Utility.isEmpty(pattern) ? x -> true : Pattern.compile(pattern).asPredicate();
container.forEach((k, v) -> {
if (filter.test(k) && !v.isExpired()) {
rs.add(k);
@@ -2178,7 +2203,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public List<String> keysStartsWith(String startsWith) {
List<String> rs = new ArrayList<>();
Predicate<String> filter = isEmpty(startsWith) ? x -> true : x -> x.startsWith(startsWith);
Predicate<String> filter = Utility.isEmpty(startsWith) ? x -> true : x -> x.startsWith(startsWith);
container.forEach((k, v) -> {
if (filter.test(k) && !v.isExpired()) {
rs.add(k);
@@ -2223,17 +2248,21 @@ public final class CacheMemorySource extends AbstractCacheSource {
OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP;
}
//值类型只能是: String、byte[]、AtomicLong
public static final class CacheEntry {
volatile long lastAccessed; //最后刷新时间
Object objectValue;
//CacheEntryType为ATOMIC、DOUBLE时类型为AtomicLong
private Serializable objectValue;
Set setValue;
private CopyOnWriteArraySet<Serializable> ssetValue;
ConcurrentLinkedDeque listValue;
private ConcurrentSkipListSet<CacheScoredValue> zsetValue;
ConcurrentHashMap mapValue;
private ConcurrentLinkedDeque<Serializable> listValue;
private ConcurrentHashMap<String, Serializable> mapValue;
private CacheEntryType cacheType;
@@ -2248,9 +2277,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
this.cacheType = cacheType;
this.key = key;
if (cacheType == CacheEntryType.SSET) {
this.setValue = new CopyOnWriteArraySet();
this.ssetValue = new CopyOnWriteArraySet();
} else if (cacheType == CacheEntryType.ZSET) {
this.setValue = new ConcurrentSkipListSet();
this.zsetValue = new ConcurrentSkipListSet();
} else if (cacheType == CacheEntryType.LIST) {
this.listValue = new ConcurrentLinkedDeque();
} else if (cacheType == CacheEntryType.MAP) {
@@ -2273,6 +2302,65 @@ public final class CacheMemorySource extends AbstractCacheSource {
return expireMills > 0 && (lastAccessed + expireMills) < System.currentTimeMillis();
}
//value类型只能是byte[]/String/AtomicLong
public static <T> T serialToObj(Convert convert, @Nonnull Type type, Serializable value) {
if (value == null) {
return null;
}
Convert c = convert == null ? JsonConvert.root() : convert;
if (value.getClass() == byte[].class) {
return (T) c.convertFrom(type, (byte[]) value);
} else { //String/AtomicLong
if (c instanceof TextConvert) {
return (T) ((TextConvert) c).convertFrom(type, value.toString());
} else {
return (T) c.convertFrom(type, value.toString().getBytes(StandardCharsets.UTF_8));
}
}
}
//返回类型只能是byte[]/String/AtomicLong
public static Serializable objToSerial(Convert convert, Type type, Object value) {
if (value == null) {
return null;
}
if (value instanceof String || value instanceof byte[]) {
return (Serializable) value;
}
Convert c = convert == null ? JsonConvert.root() : convert;
Type t = type == null ? value.getClass() : type;
if (c instanceof TextConvert) {
return ((TextConvert) c).convertTo(t, value);
} else {
return c.convertToBytes(t, value);
}
}
public <T> T getObjectValue(Convert convert, Type type) {
return serialToObj(convert, type, this.objectValue);
}
public void setObjectValue(Convert convert, Type type, Object value) {
this.objectValue = objToSerial(convert, type, value);
}
public <T> T getMapValue(String field, Convert convert, Type type) {
Serializable val = this.mapValue.get(field);
return val == null ? null : serialToObj(convert, type, val);
}
public void setMapValue(String field, Convert convert, Type type, Object value) {
this.mapValue.put(field, objToSerial(convert, type, value));
}
public Object setMapValueIfAbsent(String field, Convert convert, Type type, Object value) {
return this.mapValue.putIfAbsent(field, objToSerial(convert, type, value));
}
public void addSsetValue(Convert convert, Type type, Object value) {
this.ssetValue.add(objToSerial(convert, type, value));
}
public void lock() {
lock.lock();
}
@@ -2301,8 +2389,12 @@ public final class CacheMemorySource extends AbstractCacheSource {
return objectValue;
}
public Set getSetValue() {
return setValue;
public Set getSsetValue() {
return ssetValue;
}
public Set getZsetValue() {
return zsetValue;
}
public ConcurrentLinkedDeque getListValue() {

View File

@@ -8,6 +8,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redkale.caching.CacheConfig;
import org.redkale.caching.CacheManager;
import org.redkale.convert.json.JsonConvert;
import org.redkale.source.CacheMemorySource;
import org.redkale.util.Utility;
@@ -25,8 +26,47 @@ public class CachingTest {
@Test
public void run() throws Exception {
CacheManager cache = CacheManager.create(new CacheConfig(), new CacheMemorySource("remote"));
cache.localSetString("user", "name:haha", "haha", Duration.ofMillis(500));
Utility.sleep(501);
Duration expire = Duration.ofMillis(490);
cache.localSetString("user", "name:haha", "myha", expire);
Assertions.assertEquals(cache.localGetString("user", "name:haha"), "myha");
Utility.sleep(500);
Assertions.assertTrue(cache.localGetString("user", "name:haha") == null);
CachingBean bean = new CachingBean();
bean.setName("tom");
bean.setRemark("这是名字备注");
String json = bean.toString();
cache.localSet("user", bean.getName(), CachingBean.class, bean, expire);
Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json);
bean.setRemark(bean.getRemark() + "-新备注");
Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json);
}
public static class CachingBean {
private String name;
private String remark;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
public String toString() {
return JsonConvert.root().convertTo(this);
}
}
}