From bd088ab9eca72cc49f46f3441fafc76d49b31a80 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 11 Dec 2023 14:45:35 +0800 Subject: [PATCH] =?UTF-8?q?CacheMemorySource=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/caching/CacheManager.java | 142 +++++++ .../org/redkale/source/CacheMemorySource.java | 380 +++++++++++------- .../org/redkale/test/caching/CachingTest.java | 44 +- 3 files changed, 420 insertions(+), 146 deletions(-) diff --git a/src/main/java/org/redkale/caching/CacheManager.java b/src/main/java/org/redkale/caching/CacheManager.java index 542c5548a..42af3612d 100644 --- a/src/main/java/org/redkale/caching/CacheManager.java +++ b/src/main/java/org/redkale/caching/CacheManager.java @@ -241,6 +241,148 @@ public class CacheManager { return delAsync(remoteSource, map, key); } + //-------------------------------------- both缓存 -------------------------------------- + /** + * 远程获取缓存数据, 过期返回null + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + public T bothGet(final String map, final String key, final Type type) { + T val = get(localSource, map, key, type); + return val == null ? get(remoteSource, map, key, type) : val; + } + + /** + * 远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + public CompletableFuture bothGetAsync(final String map, final String key, final Type type) { + T val = get(localSource, map, key, type); + if (val != null) { + return CompletableFuture.completedFuture(val); + } + return getAsync(remoteSource, map, key, type); + } + + /** + * 远程获取字符串缓存数据, 过期返回null + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 数据值 + */ + public final String bothGetString(final String map, final String key) { + return bothGet(map, key, String.class); + } + + /** + * 远程异步获取字符串缓存数据, 过期返回null + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 数据值 + */ + public final CompletableFuture bothGetStringAsync(final String map, final String key) { + return bothGetAsync(map, key, String.class); + } + + /** + * 远程缓存数据 + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + public void bothSet(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) { + set(localSource, map, key, type, value, localExpire); + set(remoteSource, map, key, type, value, remoteExpire); + } + + /** + * 远程异步缓存数据 + * + * @param 泛型 + * @param map 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + public CompletableFuture bothSetAsync(final String map, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) { + set(localSource, map, key, type, value, localExpire); + return setAsync(remoteSource, map, key, type, value, remoteExpire); + } + + /** + * 远程缓存字符串数据 + * + * @param map 缓存hash + * @param key 缓存键 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + public void bothSetString(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) { + bothSet(map, key, String.class, value, localExpire, remoteExpire); + } + + /** + * 远程异步缓存字符串数据 + * + * @param map 缓存hash + * @param key 缓存键 + * @param value 数据值 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + */ + public CompletableFuture bothSetStringAsync(final String map, final String key, final String value, Duration localExpire, Duration remoteExpire) { + return bothSetAsync(map, key, String.class, value, localExpire, remoteExpire); + } + + /** + * 远程删除缓存数据 + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + public long bothDel(String map, String key) { + del(localSource, map, key); + return del(remoteSource, map, key); + } + + /** + * 远程异步删除缓存数据 + * + * @param map 缓存hash + * @param key 缓存键 + * + * @return 删除数量 + */ + public CompletableFuture bothDelAsync(String map, String key) { + del(localSource, map, key); + return delAsync(remoteSource, map, key); + } + //-------------------------------------- 内部方法 -------------------------------------- /** * 获取缓存数据, 过期返回null diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 71a599663..f0dce1ba8 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -7,6 +7,7 @@ package org.redkale.source; import java.io.Serializable; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -24,11 +25,9 @@ import org.redkale.convert.*; import org.redkale.convert.json.*; import org.redkale.service.Local; import org.redkale.util.*; -import static org.redkale.util.Utility.convertValue; -import static org.redkale.util.Utility.isEmpty; /** - * CacheSource的默认实现--内存缓存, 此实现只可用于调试,不可用于生产环境 + * CacheSource的默认实现--内存缓存 * 注意: url 需要指定为 memory:cachesource * *

@@ -222,7 +221,7 @@ public final class CacheMemorySource extends AbstractCacheSource { //------------------------ 订阅发布 SUB/PUB ------------------------ @Override public CompletableFuture> pubsubChannelsAsync(@Nullable String pattern) { - Predicate predicate = isEmpty(pattern) ? t -> true : Pattern.compile(pattern).asPredicate(); + Predicate predicate = Utility.isEmpty(pattern) ? t -> true : Pattern.compile(pattern).asPredicate(); return CompletableFuture.completedFuture(pubsubListeners.keySet().stream().filter(predicate).collect(Collectors.toList())); } @@ -296,7 +295,7 @@ public final class CacheMemorySource extends AbstractCacheSource { for (int i = 0; i < keyVals.length; i += 2) { String key = keyVals[i].toString(); Object val = keyVals[i + 1]; - set0(key.toString(), 0, null, val); + set0(key.toString(), 0, null, null, val); } } @@ -307,7 +306,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public void mset(Map map) { - map.forEach((key, val) -> set0(key.toString(), 0, null, val)); + map.forEach((key, val) -> set0(key.toString(), 0, null, null, val)); } @Override @@ -317,7 +316,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public void set(String key, Convert convert, Type type, T value) { - set0(key, 0, type, value); + set0(key, 0, convert, type, value); } @Override @@ -345,7 +344,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { entry = new CacheEntry(CacheEntryType.OBJECT, key); container.put(key, entry); - entry.objectValue = value; + entry.setObjectValue(convert, type, value); entry.expireSeconds(expireSeconds); entry.lastAccessed = System.currentTimeMillis(); return true; @@ -366,9 +365,9 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public T getSet(String key, Convert convert, Type type, T value) { CacheEntry entry = find(key, CacheEntryType.OBJECT); - T old = entry == null ? null : (T) entry.objectValue; - set0(key, 0, type, value); - return convertValue(type, old); + T old = entry == null ? null : (T) entry.getObjectValue(convert, type); + set0(key, 0, convert, type, value); + return old; } @Override @@ -388,7 +387,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } finally { containerLock.unlock(); } - return convertValue(type, entry.objectValue); + return entry.getObjectValue(null, type); } @Override @@ -396,7 +395,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyFuture(() -> getDel(key, type)); } - private void set0(String key, int expireSeconds, Type type, Object value) { + private void set0(String key, int expireSeconds, Convert convert, Type type, Object value) { CacheEntry entry = find(key, CacheEntryType.OBJECT); if (entry == null) { containerLock.lock(); @@ -412,7 +411,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - entry.objectValue = convertValue(type, value); + entry.setObjectValue(convert, type, value); entry.expireSeconds(expireSeconds); entry.lastAccessed = System.currentTimeMillis(); } finally { @@ -422,7 +421,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public void setex(String key, int expireSeconds, Convert convert, Type type, T value) { - set0(key, expireSeconds, type, value); + set0(key, expireSeconds, convert, type, value); } @Override @@ -578,7 +577,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.lock(); try { if (entry.cacheType != CacheEntryType.ATOMIC) { - entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); + entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(null, String.class))); entry.cacheType = CacheEntryType.ATOMIC; } return ((AtomicLong) entry.objectValue).addAndGet(num); @@ -611,7 +610,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.lock(); try { if (entry.cacheType != CacheEntryType.DOUBLE) { - entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); + entry.objectValue = new AtomicLong(Long.parseLong(entry.getObjectValue(null, String.class))); entry.cacheType = CacheEntryType.DOUBLE; } Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num)); @@ -650,7 +649,7 @@ public final class CacheMemorySource extends AbstractCacheSource { public List mget(final Type componentType, final String... keys) { List list = new ArrayList<>(); for (String key : keys) { - list.add(get0(key, 0, componentType)); + list.add(get0(key, 0, null, componentType)); } return list; } @@ -673,7 +672,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public T get(final String key, final Type type) { - return get0(key, 0, type); + return get0(key, 0, null, type); } @Override @@ -683,7 +682,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public T getex(final String key, final int expireSeconds, final Type type) { - return get0(key, expireSeconds, type); + return get0(key, expireSeconds, null, type); } @Override @@ -691,7 +690,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyFuture(() -> getex(key, expireSeconds, type)); } - private T get0(final String key, final int expireSeconds, final Type type) { + private T get0(final String key, final int expireSeconds, final Convert convert, final Type type) { CacheEntry entry = find(key); if (entry == null) { return null; @@ -702,23 +701,21 @@ public final class CacheMemorySource extends AbstractCacheSource { // OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP; switch (entry.cacheType) { case ATOMIC: - return convertValue(type, (AtomicLong) entry.objectValue); + return CacheEntry.serialToObj(convert, type, (AtomicLong) entry.objectValue); case DOUBLE: - return convertValue(type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue())); + return CacheEntry.serialToObj(convert, type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue())); case SSET: - return (T) new LinkedHashSet(entry.setValue); + return (T) entry.ssetValue.stream().map(v -> CacheEntry.serialToObj(convert, type, v)).collect(Collectors.toSet()); case ZSET: - return (T) new LinkedHashSet(entry.setValue); + return (T) entry.zsetValue.stream().map(v -> new CacheScoredValue(v)).collect(Collectors.toSet()); case LIST: - return (T) new ArrayList(entry.listValue); + return (T) entry.listValue.stream().map(v -> CacheEntry.serialToObj(convert, type, v)).collect(Collectors.toList()); case MAP: - return (T) new LinkedHashMap<>(entry.mapValue); + LinkedHashMap map = new LinkedHashMap(); + entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(convert, type, v))); + return (T) map; default: - Object obj = entry.objectValue; - if (obj != null && obj.getClass() != type) { - return (T) JsonConvert.root().convertFrom(type, JsonConvert.root().convertToBytes(obj)); - } - return (T) obj; + return entry.getObjectValue(convert, type); } } @@ -806,7 +803,7 @@ public final class CacheMemorySource extends AbstractCacheSource { Map map = entry.mapValue; Serializable val = (Serializable) map.computeIfAbsent(field, f -> new AtomicLong()); if (!(val instanceof AtomicLong)) { - val = new AtomicLong(((Number) val).longValue()); + val = CacheEntry.objToSerial(null, AtomicLong.class, val); map.put(field, val); } return ((AtomicLong) val).addAndGet(num); @@ -864,7 +861,7 @@ public final class CacheMemorySource extends AbstractCacheSource { //需要给CacheFactory使用 @Override public void hset(final String key, final String field, final Convert convert, final Type type, final T value) { - hset0(key, field, type, value); + hset0(key, field, convert, type, value); } @Override @@ -892,7 +889,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - boolean rs = entry.mapValue.putIfAbsent(field, convertValue(type, value)) == null; + boolean rs = entry.setMapValueIfAbsent(field, convert, type, value) == null; entry.lastAccessed = System.currentTimeMillis(); return rs; } finally { @@ -908,7 +905,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public void hmset(final String key, final Serializable... values) { for (int i = 0; i < values.length; i += 2) { - hset0(key, (String) values[i], null, values[i + 1]); + hset0(key, (String) values[i], null, null, values[i + 1]); } } @@ -919,7 +916,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public void hmset(final String key, final Map map) { - map.forEach((k, v) -> hset0(key, (String) k, null, v)); + map.forEach((k, v) -> hset0(key, (String) k, null, null, v)); } @Override @@ -933,10 +930,9 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return null; } - Map map = entry.mapValue; List rs = new ArrayList<>(fields.length); for (String field : fields) { - rs.add(convertValue(type, map.get(field))); + rs.add(entry.getMapValue(field, null, type)); } return rs; } @@ -953,7 +949,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return new LinkedHashMap(); } else { Map map = new LinkedHashMap(); - entry.mapValue.forEach((k, v) -> map.put(k, convertValue(type, v))); + entry.mapValue.forEach((k, v) -> map.put(k, CacheEntry.serialToObj(null, type, v))); return map; } } @@ -969,7 +965,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return new ArrayList(); } else { - Stream stream = entry.mapValue.values().stream().map(v -> convertValue(type, v)); + Stream stream = entry.mapValue.values().stream().map(v -> CacheEntry.serialToObj(null, type, v)); return new ArrayList(stream.collect(Collectors.toList())); } } @@ -989,12 +985,14 @@ public final class CacheMemorySource extends AbstractCacheSource { return new HashMap(); } if (Utility.isEmpty(pattern)) { - return new LinkedHashMap(entry.mapValue); + Set> set = entry.mapValue.entrySet(); + return set.stream() + .collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(null, type, en.getValue()))); } else { Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); Set> set = entry.mapValue.entrySet(); return set.stream().filter(en -> regx.test(en.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, en -> convertValue(type, en.getValue()))); + .collect(Collectors.toMap(Map.Entry::getKey, en -> CacheEntry.serialToObj(null, type, en.getValue()))); } } @@ -1013,8 +1011,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return null; } - Object obj = entry.mapValue.get(field); - return obj == null ? null : convertValue(type, obj); + return entry.getMapValue(field, convert, type); } @Override @@ -1031,8 +1028,8 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return 0L; } - Object obj = entry.mapValue.get(field); - return obj == null ? 0L : (long) obj.toString().length(); + String obj = entry.getMapValue(field, null, String.class); + return obj == null ? 0L : (long) obj.length(); } @Override @@ -1040,7 +1037,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyFuture(() -> hstrlen(key, field)); } - private void hset0(String key, String field, Type type, Object value) { + private void hset0(String key, String field, Convert convert, Type type, Object value) { if (value == null) { return; } @@ -1059,7 +1056,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - entry.mapValue.put(field, convertValue(type, value)); + entry.setMapValue(field, convert, type, value); entry.lastAccessed = System.currentTimeMillis(); } finally { entry.unlock(); @@ -1111,9 +1108,9 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return null; } - List list = new ArrayList(entry.listValue); + List list = new ArrayList(entry.listValue); int pos = index >= 0 ? index : list.size() + index; - return pos >= list.size() ? null : convertValue(componentType, list.get(pos)); + return pos >= list.size() ? null : CacheEntry.serialToObj(null, componentType, list.get(pos)); } @Override @@ -1147,29 +1144,30 @@ public final class CacheMemorySource extends AbstractCacheSource { return 0L; } entry.lock(); + Serializable val = CacheEntry.objToSerial(null, componentType, value); try { - List list = new ArrayList<>(entry.listValue); + List list = new ArrayList<>(entry.listValue); int pos = list.indexOf(pivot); if (pos < 0) { return -1L; } - List newList = new ArrayList<>(); + List newList = new ArrayList<>(); if (before) { if (pos == 0) { - newList.add(value); + newList.add(val); newList.addAll(list); } else { newList.addAll(list.subList(0, pos)); - newList.add(value); + newList.add(val); newList.addAll(list.subList(pos, list.size())); } } else { if (pos == list.size() - 1) { newList.addAll(list); - newList.add(value); + newList.add(val); } else { newList.addAll(list.subList(0, pos + 1)); - newList.add(value); + newList.add(val); newList.addAll(list.subList(pos + 1, list.size())); } } @@ -1199,7 +1197,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.lock(); try { for (T val : values) { - entry.listValue.addFirst(val); + entry.listValue.addFirst(CacheEntry.objToSerial(null, componentType, val)); } } finally { entry.unlock(); @@ -1221,7 +1219,7 @@ public final class CacheMemorySource extends AbstractCacheSource { try { ConcurrentLinkedDeque list = entry.listValue; for (T val : values) { - list.addFirst(val); + list.addFirst(CacheEntry.objToSerial(null, componentType, val)); } } finally { entry.unlock(); @@ -1241,7 +1239,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - return convertValue(componentType, entry.listValue.pollFirst()); + return CacheEntry.serialToObj(null, componentType, entry.listValue.pollFirst()); } finally { entry.unlock(); } @@ -1302,7 +1300,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - return convertValue(componentType, entry.listValue.pollLast()); + return CacheEntry.serialToObj(null, componentType, entry.listValue.pollLast()); } finally { entry.unlock(); } @@ -1321,8 +1319,10 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - ConcurrentLinkedDeque list = entry.listValue; - list.addAll(List.of(values)); + ConcurrentLinkedDeque list = entry.listValue; + for (T val : values) { + list.add(CacheEntry.objToSerial(null, componentType, val)); + } } finally { entry.unlock(); } @@ -1350,8 +1350,10 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - ConcurrentLinkedDeque list = entry.listValue; - list.addAll(List.of(values)); + ConcurrentLinkedDeque list = entry.listValue; + for (T val : values) { + list.add(CacheEntry.objToSerial(null, componentType, val)); + } } finally { entry.unlock(); } @@ -1389,18 +1391,20 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return list; } - List vals = new ArrayList<>(entry.setValue); + List vals = new ArrayList<>(entry.ssetValue); if (count < 0) { //可以重复 for (int i = 0; i < Math.abs(count); i++) { int index = ThreadLocalRandom.current().nextInt(vals.size()); - T val = vals.get(index); - list.add(convertValue(componentType, val)); + Serializable val = vals.get(index); + list.add(CacheEntry.serialToObj(null, componentType, val)); } } else { //不可以重复 if (count >= vals.size()) { - return vals; + return vals.stream() + .map(val -> (T) CacheEntry.serialToObj(null, componentType, val)).collect(Collectors.toList()); } - return vals.subList(0, count); + return vals.subList(0, count).stream() + .map(val -> (T) CacheEntry.serialToObj(null, componentType, val)).collect(Collectors.toList()); } return list; } @@ -1419,7 +1423,8 @@ public final class CacheMemorySource extends AbstractCacheSource { boolean rs = false; entry.lock(); try { - rs = entry.setValue.remove(member); + Serializable val = CacheEntry.objToSerial(null, componentType, member); + rs = entry.ssetValue.remove(val); } finally { entry.unlock(); } @@ -1439,7 +1444,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry2.lock(); try { - entry2.setValue.add(member); + entry2.addSsetValue(null, componentType, member); } finally { entry2.unlock(); } @@ -1454,7 +1459,9 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public Set sdiff(final String key, final Type componentType, final String... key2s) { - return sdiff0(key, key2s); + return sdiff0(key, key2s).stream() + .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .collect(Collectors.toSet()); } @Override @@ -1464,7 +1471,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public long sdiffstore(final String key, final String srcKey, final String... srcKey2s) { - Set rs = sdiff0(srcKey, srcKey2s); + Set rs = sdiff0(srcKey, srcKey2s); CacheEntry entry = find(key, CacheEntryType.SSET); if (entry == null) { containerLock.lock(); @@ -1480,8 +1487,8 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - entry.setValue.clear(); - entry.setValue.addAll(rs); + entry.ssetValue.clear(); + entry.ssetValue.addAll(rs); } finally { entry.unlock(); } @@ -1493,17 +1500,17 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyFuture(() -> sdiffstore(key, srcKey, srcKey2s)); } - private Set sdiff0(final String key, final String... key2s) { - Set rs = new HashSet<>(); + private Set sdiff0(final String key, final String... key2s) { + Set rs = new HashSet<>(); CacheEntry entry = find(key, CacheEntryType.SSET); if (entry == null) { return rs; } - rs.addAll(entry.setValue); + rs.addAll(entry.ssetValue); for (String k : key2s) { CacheEntry en2 = find(k, CacheEntryType.SSET); if (en2 != null) { - en2.setValue.forEach(rs::remove); + en2.ssetValue.forEach(rs::remove); } } return rs; @@ -1511,7 +1518,9 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public Set sinter(final String key, final Type componentType, final String... key2s) { - return sinter0(key, key2s); + return sinter0(key, key2s).stream() + .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .collect(Collectors.toSet()); } @Override @@ -1521,7 +1530,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public long sinterstore(final String key, final String srcKey, final String... srcKey2s) { - Set rs = sinter0(srcKey, srcKey2s); + Set rs = sinter0(srcKey, srcKey2s); CacheEntry entry = find(key, CacheEntryType.SSET); if (entry == null) { containerLock.lock(); @@ -1537,8 +1546,8 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - entry.setValue.clear(); - entry.setValue.addAll(rs); + entry.ssetValue.clear(); + entry.ssetValue.addAll(rs); } finally { entry.unlock(); } @@ -1550,23 +1559,23 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyFuture(() -> sinterstore(key, srcKey, srcKey2s)); } - private Set sinter0(final String key, final String... key2s) { - Set rs = new HashSet<>(); + private Set sinter0(final String key, final String... key2s) { + Set rs = new HashSet<>(); CacheEntry entry = find(key, CacheEntryType.SSET); if (entry == null) { return rs; } - rs.addAll(entry.setValue); + rs.addAll(entry.ssetValue); for (String k : key2s) { CacheEntry en2 = find(k, CacheEntryType.SSET); if (en2 != null) { - Set removes = new HashSet<>(); - for (T v : rs) { - if (!en2.setValue.contains(v)) { - removes.add(v); + Set rms = new HashSet<>(); + for (Serializable v : rs) { + if (!en2.ssetValue.contains(v)) { + rms.add(v); } } - rs.removeAll(removes); + rs.removeAll(rms); } else { rs.clear(); return rs; @@ -1577,7 +1586,9 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public Set sunion(final String key, final Type componentType, final String... key2s) { - return sunion0(key, key2s); + return sunion0(key, key2s).stream() + .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .collect(Collectors.toSet()); } @Override @@ -1587,7 +1598,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public long sunionstore(final String key, final String srcKey, final String... srcKey2s) { - Set rs = sunion0(srcKey, srcKey2s); + Set rs = sunion0(srcKey, srcKey2s); CacheEntry entry = find(key, CacheEntryType.SSET); if (entry == null) { @@ -1604,8 +1615,8 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - entry.setValue.clear(); - entry.setValue.addAll(rs); + entry.ssetValue.clear(); + entry.ssetValue.addAll(rs); } finally { entry.unlock(); } @@ -1617,16 +1628,16 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyFuture(() -> sunionstore(key, srcKey, srcKey2s)); } - private Set sunion0(final String key, final String... key2s) { - Set rs = new HashSet<>(); + private Set sunion0(final String key, final String... key2s) { + Set rs = new HashSet<>(); CacheEntry entry = find(key, CacheEntryType.SSET); if (entry != null) { - rs.addAll(entry.setValue); + rs.addAll(entry.ssetValue); } for (String k : key2s) { CacheEntry en2 = find(k, CacheEntryType.SSET); if (en2 != null) { - rs.addAll(en2.setValue); + rs.addAll(en2.ssetValue); } } return rs; @@ -1638,7 +1649,9 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return new LinkedHashSet<>(); } - return new LinkedHashSet<>(entry.setValue); + return entry.ssetValue.stream() + .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .collect(Collectors.toSet()); } @Override @@ -1652,7 +1665,9 @@ public final class CacheMemorySource extends AbstractCacheSource { for (String key : keys) { CacheEntry entry = find(key, CacheEntryType.SSET); if (entry != null) { - map.put(key, new LinkedHashSet<>(entry.setValue)); + map.put(key, entry.ssetValue.stream() + .map(v -> (T) CacheEntry.serialToObj(null, componentType, v)) + .collect(Collectors.toSet())); } } return map; @@ -1673,7 +1688,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } return rs; } - Set set = entry.setValue; + Set set = entry.ssetValue; for (String member : members) { rs.add(set.contains(member)); } @@ -1702,8 +1717,9 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - Set set = entry.setValue; - set.addAll(List.of(values)); + for (T val : values) { + entry.addSsetValue(null, componentType, val); + } } finally { entry.unlock(); } @@ -1717,7 +1733,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public long scard(final String key) { CacheEntry entry = find(key, CacheEntryType.SSET); - return entry == null ? 0L : (long) entry.setValue.size(); + return entry == null ? 0L : (long) entry.ssetValue.size(); } @Override @@ -1728,7 +1744,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public boolean sismember(final String key, final Type type, final T value) { CacheEntry entry = find(key, CacheEntryType.SSET); - return entry != null && entry.setValue.contains(value); + return entry != null && entry.ssetValue.contains(CacheEntry.objToSerial(null, type, value)); } @Override @@ -1744,18 +1760,18 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - final Set cset = entry.setValue; + final Set cset = entry.ssetValue; if (cset.isEmpty()) { return null; } - Iterator it = cset.iterator(); - Object del = null; + Iterator it = cset.iterator(); + Serializable del = null; if (it.hasNext()) { del = it.next(); } if (del != null) { cset.remove(del); - return (T) del; + return CacheEntry.serialToObj(null, componentType, del); } return null; } finally { @@ -1776,20 +1792,23 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - final Set cset = entry.setValue; + final Set cset = entry.ssetValue; if (cset.isEmpty()) { return new LinkedHashSet<>(); } - Iterator it = cset.iterator(); + Iterator it = cset.iterator(); Set list = new LinkedHashSet<>(); + Set rms = new LinkedHashSet<>(); int index = 0; while (it.hasNext()) { - list.add(convertValue(componentType, it.next())); + Serializable item = it.next(); + rms.add(item); + list.add(CacheEntry.serialToObj(null, componentType, item)); if (++index >= count) { break; } } - cset.removeAll(list); + cset.removeAll(rms); return list; } finally { entry.unlock(); @@ -1809,14 +1828,14 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - final Set cset = entry.setValue; + final Set cset = entry.ssetValue; if (cset.isEmpty()) { return new LinkedHashSet<>(); } - Iterator it = cset.iterator(); + Iterator it = cset.iterator(); Set list = new LinkedHashSet<>(); while (it.hasNext()) { - list.add((T) convertValue(componentType, it.next())); + list.add(CacheEntry.serialToObj(null, componentType, it.next())); } return list; } finally { @@ -1835,7 +1854,11 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return 0L; } - return entry.setValue.removeAll(Arrays.asList(values)) ? 1L : 0L; + long count = 0; + for (T val : values) { + count += entry.ssetValue.remove(CacheEntry.objToSerial(null, type, val)) ? 1 : 0; + } + return count; } @Override @@ -1846,7 +1869,7 @@ public final class CacheMemorySource extends AbstractCacheSource { //------------------------ 有序集合 Sorted Set ------------------------ @Override public void zadd(String key, CacheScoredValue... values) { - List list = new ArrayList<>(); + List list = new ArrayList<>(); for (CacheScoredValue v : values) { list.add(new CacheScoredValue(v)); } @@ -1865,7 +1888,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - entry.setValue.addAll(list); + entry.zsetValue.addAll(list); } finally { entry.unlock(); } @@ -1893,7 +1916,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.lock(); try { - Set sets = entry.setValue; + Set sets = entry.zsetValue; CacheScoredValue old = sets.stream().filter(v -> Objects.equals(v.getValue(), value.getValue())).findAny().orElse(null); if (old == null) { sets.add(new CacheScoredValue(value.getScore().doubleValue(), value.getValue())); @@ -1931,7 +1954,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return 0L; } - return entry.setValue.size(); + return entry.zsetValue.size(); } @Override @@ -1945,7 +1968,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return null; } - List list = new ArrayList<>(entry.setValue); + List list = new ArrayList<>(entry.zsetValue); Collections.sort(list); long c = 0; for (CacheScoredValue v : list) { @@ -1968,7 +1991,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return null; } - List list = new ArrayList<>(entry.setValue); + List list = new ArrayList<>(entry.zsetValue); Collections.sort(list, Collections.reverseOrder()); long c = 0; for (CacheScoredValue v : list) { @@ -1992,7 +2015,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return new ArrayList<>(); } List list = new ArrayList<>(); - Set sets = entry.setValue; + Set sets = entry.zsetValue; long c = 0; for (CacheScoredValue v : sets) { if (c >= start && (stop < 0 || c <= stop)) { @@ -2014,7 +2037,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return new ArrayList(); } - Set sets = entry.setValue; + Set sets = entry.zsetValue; if (Utility.isEmpty(pattern)) { return sets.stream().collect(Collectors.toList()); } else { @@ -2034,7 +2057,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return 0L; } - Set sets = entry.setValue; + Set sets = entry.zsetValue; long c = 0; Set keys = Set.of(members); Iterator it = sets.iterator(); @@ -2066,11 +2089,9 @@ public final class CacheMemorySource extends AbstractCacheSource { return list; } Set keys = Set.of(members); - Set sets = entry.setValue; + Set sets = entry.zsetValue; Map map = new HashMap<>(); - sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> { - map.put(v.getValue(), formatScore(scoreType, v.getScore())); - }); + sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> map.put(v.getValue(), formatScore(scoreType, v.getScore()))); for (String m : members) { list.add(map.get(m)); } @@ -2105,8 +2126,12 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return null; } - Set sets = entry.setValue; - return formatScore(scoreType, sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null)); + Set sets = entry.zsetValue; + return formatScore(scoreType, sets.stream() + .filter(v -> Objects.equals(member, v.getValue())) + .findAny() + .map(v -> v.getScore()) + .orElse(null)); } @Override @@ -2147,7 +2172,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public List keys(String pattern) { List rs = new ArrayList<>(); - Predicate filter = isEmpty(pattern) ? x -> true : Pattern.compile(pattern).asPredicate(); + Predicate filter = Utility.isEmpty(pattern) ? x -> true : Pattern.compile(pattern).asPredicate(); container.forEach((k, v) -> { if (filter.test(k) && !v.isExpired()) { rs.add(k); @@ -2178,7 +2203,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public List keysStartsWith(String startsWith) { List rs = new ArrayList<>(); - Predicate filter = isEmpty(startsWith) ? x -> true : x -> x.startsWith(startsWith); + Predicate filter = Utility.isEmpty(startsWith) ? x -> true : x -> x.startsWith(startsWith); container.forEach((k, v) -> { if (filter.test(k) && !v.isExpired()) { rs.add(k); @@ -2223,17 +2248,21 @@ public final class CacheMemorySource extends AbstractCacheSource { OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP; } + //值类型只能是: String、byte[]、AtomicLong public static final class CacheEntry { volatile long lastAccessed; //最后刷新时间 - Object objectValue; + //CacheEntryType为ATOMIC、DOUBLE时类型为AtomicLong + private Serializable objectValue; - Set setValue; + private CopyOnWriteArraySet ssetValue; - ConcurrentLinkedDeque listValue; + private ConcurrentSkipListSet zsetValue; - ConcurrentHashMap mapValue; + private ConcurrentLinkedDeque listValue; + + private ConcurrentHashMap mapValue; private CacheEntryType cacheType; @@ -2248,9 +2277,9 @@ public final class CacheMemorySource extends AbstractCacheSource { this.cacheType = cacheType; this.key = key; if (cacheType == CacheEntryType.SSET) { - this.setValue = new CopyOnWriteArraySet(); + this.ssetValue = new CopyOnWriteArraySet(); } else if (cacheType == CacheEntryType.ZSET) { - this.setValue = new ConcurrentSkipListSet(); + this.zsetValue = new ConcurrentSkipListSet(); } else if (cacheType == CacheEntryType.LIST) { this.listValue = new ConcurrentLinkedDeque(); } else if (cacheType == CacheEntryType.MAP) { @@ -2273,6 +2302,65 @@ public final class CacheMemorySource extends AbstractCacheSource { return expireMills > 0 && (lastAccessed + expireMills) < System.currentTimeMillis(); } + //value类型只能是byte[]/String/AtomicLong + public static T serialToObj(Convert convert, @Nonnull Type type, Serializable value) { + if (value == null) { + return null; + } + Convert c = convert == null ? JsonConvert.root() : convert; + if (value.getClass() == byte[].class) { + return (T) c.convertFrom(type, (byte[]) value); + } else { //String/AtomicLong + if (c instanceof TextConvert) { + return (T) ((TextConvert) c).convertFrom(type, value.toString()); + } else { + return (T) c.convertFrom(type, value.toString().getBytes(StandardCharsets.UTF_8)); + } + } + } + + //返回类型只能是byte[]/String/AtomicLong + public static Serializable objToSerial(Convert convert, Type type, Object value) { + if (value == null) { + return null; + } + if (value instanceof String || value instanceof byte[]) { + return (Serializable) value; + } + Convert c = convert == null ? JsonConvert.root() : convert; + Type t = type == null ? value.getClass() : type; + if (c instanceof TextConvert) { + return ((TextConvert) c).convertTo(t, value); + } else { + return c.convertToBytes(t, value); + } + } + + public T getObjectValue(Convert convert, Type type) { + return serialToObj(convert, type, this.objectValue); + } + + public void setObjectValue(Convert convert, Type type, Object value) { + this.objectValue = objToSerial(convert, type, value); + } + + public T getMapValue(String field, Convert convert, Type type) { + Serializable val = this.mapValue.get(field); + return val == null ? null : serialToObj(convert, type, val); + } + + public void setMapValue(String field, Convert convert, Type type, Object value) { + this.mapValue.put(field, objToSerial(convert, type, value)); + } + + public Object setMapValueIfAbsent(String field, Convert convert, Type type, Object value) { + return this.mapValue.putIfAbsent(field, objToSerial(convert, type, value)); + } + + public void addSsetValue(Convert convert, Type type, Object value) { + this.ssetValue.add(objToSerial(convert, type, value)); + } + public void lock() { lock.lock(); } @@ -2301,8 +2389,12 @@ public final class CacheMemorySource extends AbstractCacheSource { return objectValue; } - public Set getSetValue() { - return setValue; + public Set getSsetValue() { + return ssetValue; + } + + public Set getZsetValue() { + return zsetValue; } public ConcurrentLinkedDeque getListValue() { diff --git a/src/test/java/org/redkale/test/caching/CachingTest.java b/src/test/java/org/redkale/test/caching/CachingTest.java index 1dfc4bd5e..d677e3071 100644 --- a/src/test/java/org/redkale/test/caching/CachingTest.java +++ b/src/test/java/org/redkale/test/caching/CachingTest.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redkale.caching.CacheConfig; import org.redkale.caching.CacheManager; +import org.redkale.convert.json.JsonConvert; import org.redkale.source.CacheMemorySource; import org.redkale.util.Utility; @@ -25,8 +26,47 @@ public class CachingTest { @Test public void run() throws Exception { CacheManager cache = CacheManager.create(new CacheConfig(), new CacheMemorySource("remote")); - cache.localSetString("user", "name:haha", "haha", Duration.ofMillis(500)); - Utility.sleep(501); + Duration expire = Duration.ofMillis(490); + cache.localSetString("user", "name:haha", "myha", expire); + Assertions.assertEquals(cache.localGetString("user", "name:haha"), "myha"); + Utility.sleep(500); Assertions.assertTrue(cache.localGetString("user", "name:haha") == null); + + CachingBean bean = new CachingBean(); + bean.setName("tom"); + bean.setRemark("这是名字备注"); + + String json = bean.toString(); + cache.localSet("user", bean.getName(), CachingBean.class, bean, expire); + Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json); + bean.setRemark(bean.getRemark() + "-新备注"); + Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json); + } + + public static class CachingBean { + + private String name; + + private String remark; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getRemark() { + return remark; + } + + public void setRemark(String remark) { + this.remark = remark; + } + + public String toString() { + return JsonConvert.root().convertTo(this); + } } }