From 8ab6b6e955582c83f86fa2800c442e25f6386ec1 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 12 Dec 2023 16:38:15 +0800 Subject: [PATCH] CacheManager --- .../java/org/redkale/cache/CacheManager.java | 50 ++++- .../cache/support/CacheManagerService.java | 204 +++++++++++++----- 2 files changed, 190 insertions(+), 64 deletions(-) diff --git a/src/main/java/org/redkale/cache/CacheManager.java b/src/main/java/org/redkale/cache/CacheManager.java index 386723560..18c3b9ffa 100644 --- a/src/main/java/org/redkale/cache/CacheManager.java +++ b/src/main/java/org/redkale/cache/CacheManager.java @@ -254,7 +254,7 @@ public interface CacheManager { //-------------------------------------- both缓存 -------------------------------------- /** - * 远程获取缓存数据, 过期返回null + * 本地或远程获取缓存数据, 过期返回null * * @param 泛型 * @param hash 缓存hash @@ -266,7 +266,7 @@ public interface CacheManager { public T bothGet(final String hash, final String key, final Type type); /** - * 远程获取字符串缓存数据, 过期返回null + * 本地或远程获取字符串缓存数据, 过期返回null * * @param hash 缓存hash * @param key 缓存键 @@ -278,7 +278,7 @@ public interface CacheManager { } /** - * 远程异步获取缓存数据, 过期返回null + * 本地或远程异步获取缓存数据, 过期返回null * * @param 泛型 * @param hash 缓存hash @@ -290,7 +290,7 @@ public interface CacheManager { public CompletableFuture bothGetAsync(final String hash, final String key, final Type type); /** - * 远程异步获取字符串缓存数据, 过期返回null + * 本地或远程异步获取字符串缓存数据, 过期返回null * * @param hash 缓存hash * @param key 缓存键 @@ -302,7 +302,37 @@ public interface CacheManager { } /** - * 远程缓存数据 + * 本地或远程获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + public T bothGet(String hash, String key, Type type, Duration localExpire, Duration remoteExpire, Supplier supplier); + + /** + * 本地或远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + public CompletableFuture bothGetAsync(String hash, String key, Type type, Duration localExpire, Duration remoteExpire, Supplier> supplier); + + /** + * 本地和远程缓存数据 * * @param 泛型 * @param hash 缓存hash @@ -315,7 +345,7 @@ public interface CacheManager { public void bothSet(final String hash, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire); /** - * 远程缓存字符串数据 + * 本地和远程缓存字符串数据 * * @param hash 缓存hash * @param key 缓存键 @@ -328,7 +358,7 @@ public interface CacheManager { } /** - * 远程异步缓存数据 + * 本地和远程异步缓存数据 * * @param 泛型 * @param hash 缓存hash @@ -341,7 +371,7 @@ public interface CacheManager { public CompletableFuture bothSetAsync(String hash, String key, Type type, T value, Duration localExpire, Duration remoteExpire); /** - * 远程异步缓存字符串数据 + * 本地和远程异步缓存字符串数据 * * @param hash 缓存hash * @param key 缓存键 @@ -354,7 +384,7 @@ public interface CacheManager { } /** - * 远程删除缓存数据 + * 本地和远程删除缓存数据 * * @param hash 缓存hash * @param key 缓存键 @@ -364,7 +394,7 @@ public interface CacheManager { public long bothDel(String hash, String key); /** - * 远程异步删除缓存数据 + * 本地和远程异步删除缓存数据 * * @param hash 缓存hash * @param key 缓存键 diff --git a/src/main/java/org/redkale/cache/support/CacheManagerService.java b/src/main/java/org/redkale/cache/support/CacheManagerService.java index fa51630a8..caa11f6a0 100644 --- a/src/main/java/org/redkale/cache/support/CacheManagerService.java +++ b/src/main/java/org/redkale/cache/support/CacheManagerService.java @@ -4,6 +4,7 @@ package org.redkale.cache.support; import java.lang.reflect.Type; +import java.math.BigDecimal; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -162,7 +163,7 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ public T localGet(final String hash, final String key, final Type type, Duration expire, Supplier supplier) { - return get(localSource, hash, key, type, expire, supplier); + return get(localSource::hget, localSource::hset, hash, key, type, expire, supplier); } /** @@ -179,7 +180,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public CompletableFuture localGetAsync(String hash, String key, Type type, Duration expire, Supplier> supplier) { - return get(localSource, hash, key, type, expire, supplier); + return getAsync(localSource::hgetAsync, localSource::hsetAsync, hash, key, type, expire, supplier); } /** @@ -260,7 +261,7 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ public T remoteGet(final String hash, final String key, final Type type, Duration expire, Supplier supplier) { - return get(remoteSource, hash, key, type, expire, supplier); + return get(remoteSource::hget, remoteSource::hset, hash, key, type, expire, supplier); } /** @@ -276,7 +277,7 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ public CompletableFuture remoteGetAsync(String hash, String key, Type type, Duration expire, Supplier> supplier) { - return getAsync(remoteSource, hash, key, type, expire, supplier); + return getAsync(remoteSource::hgetAsync, remoteSource::hsetAsync, hash, key, type, expire, supplier); } /** @@ -346,17 +347,9 @@ public class CacheManagerService implements CacheManager, Service { * * @return 数据值 */ + @Override public T bothGet(final String hash, final String key, final Type type) { - Type t = loadCacheType(type); - CacheValue val = localSource.hget(hash, key, t); - if (CacheValue.isValid(val)) { - return val.getValue(); - } - if (remoteSource != null) { - return CacheValue.get(remoteSource.hget(hash, key, t)); - } else { - return null; - } + return CacheValue.get(bothGetCache(hash, key, type)); } /** @@ -369,18 +362,57 @@ public class CacheManagerService implements CacheManager, Service { * * @return 数据值 */ + @Override public CompletableFuture bothGetAsync(final String hash, final String key, final Type type) { - Type t = loadCacheType(type); - CacheValue val = localSource.hget(hash, key, t); - if (CacheValue.isValid(val)) { - return CompletableFuture.completedFuture(val.getValue()); - } - if (remoteSource != null) { - CompletableFuture> future = remoteSource.hgetAsync(hash, key, t); - return future.thenApply(CacheValue::get); - } else { - return CompletableFuture.completedFuture(null); - } + return bothGetCacheAsync(hash, key, type).thenApply(CacheValue::get); + } + + /** + * 远程获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + @Override + public T bothGet(final String hash, final String key, final Type type, Duration localExpire, Duration remoteExpire, Supplier supplier) { + return get(this::bothGetCache, (h, k, t, v) -> { + localSource.hset(key, key, type, v); + if (remoteSource != null) { + remoteSource.hset(hash, key, t, CacheValue.create(v.getValue(), remoteExpire)); + } + }, hash, key, type, localExpire, supplier); + } + + /** + * 远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param localExpire 本地过期时长,为null表示永不过期 + * @param remoteExpire 远程过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + @Override + public CompletableFuture bothGetAsync(String hash, String key, Type type, Duration localExpire, Duration remoteExpire, Supplier> supplier) { + return getAsync(this::bothGetCacheAsync, (h, k, t, v) -> { + localSource.hset(key, key, type, v); + if (remoteSource != null) { + return remoteSource.hsetAsync(hash, key, t, CacheValue.create(v.getValue(), remoteExpire)); + } else { + return CompletableFuture.completedFuture(null); + } + }, hash, key, type, localExpire, supplier); } /** @@ -402,6 +434,11 @@ public class CacheManagerService implements CacheManager, Service { } } + public static void main(String[] args) throws Throwable { + BigDecimal z = new BigDecimal("0"); + System.out.println(Objects.equals(BigDecimal.ZERO, z)); + } + /** * 远程异步缓存数据 * @@ -462,7 +499,8 @@ public class CacheManagerService implements CacheManager, Service { * 获取缓存数据, 过期返回null * * @param 泛型 - * @param source 缓存源 + * @param getter 获取数据函数 + * @param setter 设置数据函数 * @param hash 缓存hash * @param key 缓存键 * @param type 数据类型 @@ -471,21 +509,22 @@ public class CacheManagerService implements CacheManager, Service { * * @return 数据值 */ - protected T get(CacheSource source, String hash, String key, Type type, Duration expire, Supplier supplier) { + protected T get(GetterFunc> getter, SetterSyncFunc setter, + String hash, String key, Type type, Duration expire, Supplier supplier) { Objects.requireNonNull(supplier); final Type t = loadCacheType(type); - CacheValue val = source.hget(hash, key, t); + CacheValue val = getter.apply(hash, key, t); if (CacheValue.isValid(val)) { return val.getValue(); } Function func = k -> { - CacheValue oldVal = source.hget(hash, key, t); + CacheValue oldVal = getter.apply(hash, key, t); if (CacheValue.isValid(oldVal)) { return oldVal; } CacheValue newVal = toCacheSupplier(expire, supplier).get(); if (CacheValue.isValid(newVal)) { - source.hset(hash, key, t, newVal); + setter.apply(hash, key, t, newVal); } return newVal; }; @@ -502,7 +541,8 @@ public class CacheManagerService implements CacheManager, Service { * 异步获取缓存数据, 过期返回null * * @param 泛型 - * @param source 缓存源 + * @param getter 获取数据函数 + * @param setter 设置数据函数 * @param hash 缓存hash * @param key 缓存键 * @param type 数据类型 @@ -511,33 +551,74 @@ public class CacheManagerService implements CacheManager, Service { * * @return 数据值 */ - protected CompletableFuture getAsync(CacheSource source, String hash, String key, Type type, Duration expire, Supplier> supplier) { + protected CompletableFuture getAsync(GetterFunc>> getter, SetterAsyncFunc setter, + String hash, String key, Type type, Duration expire, Supplier> supplier) { Objects.requireNonNull(supplier); final Type t = loadCacheType(type); - CacheValue val = source.hget(hash, key, t); - if (CacheValue.isValid(val)) { - return CompletableFuture.completedFuture(val.getValue()); - } - final String lockId = lockId(hash, key); - final CacheAsyncEntry entry = asyncLock.computeIfAbsent(lockId, CacheAsyncEntry::new); - CompletableFuture future = new CompletableFuture<>(); - if (entry.compareAddFuture(future)) { - try { - supplier.get().whenComplete((v, e) -> { - if (e != null) { - entry.fail(e); - } - CacheValue rs = toCacheValue(expire, v); - if (CacheValue.isValid(val)) { - source.hset(hash, key, t, val); - } - entry.success(CacheValue.get(rs)); - }); - } catch (Throwable e) { - entry.fail(e); + CompletableFuture> sourceFuture = getter.apply(hash, key, t); + return sourceFuture.thenCompose(val -> { + if (CacheValue.isValid(val)) { + return CompletableFuture.completedFuture(val.getValue()); } + final String lockId = lockId(hash, key); + final CacheAsyncEntry entry = asyncLock.computeIfAbsent(lockId, CacheAsyncEntry::new); + CompletableFuture future = new CompletableFuture<>(); + if (entry.compareAddFuture(future)) { + try { + supplier.get().whenComplete((v, e) -> { + if (e != null) { + entry.fail(e); + } + CacheValue rs = toCacheValue(expire, v); + if (CacheValue.isValid(val)) { + setter.apply(hash, key, t, val) + .whenComplete((v2, e2) -> entry.success(CacheValue.get(rs))); + } else { + entry.success(CacheValue.get(rs)); + } + }); + } catch (Throwable e) { + entry.fail(e); + } + } + return future; + }); + } + + protected CacheValue bothGetCache(final String hash, final String key, final Type type) { + Type t = loadCacheType(type); + CacheValue val = localSource.hget(hash, key, t); + if (CacheValue.isValid(val)) { + return val; + } + if (remoteSource != null) { + return remoteSource.hget(hash, key, t); + } else { + return null; + } + } + + /** + * 远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * + * @return 数据值 + */ + protected CompletableFuture> bothGetCacheAsync(final String hash, final String key, final Type type) { + Type t = loadCacheType(type); + CacheValue val = localSource.hget(hash, key, t); + if (CacheValue.isValid(val)) { + return CompletableFuture.completedFuture(val); + } + if (remoteSource != null) { + return remoteSource.hgetAsync(hash, key, t); + } else { + return CompletableFuture.completedFuture(null); } - return future; } /** @@ -604,6 +685,21 @@ public class CacheManagerService implements CacheManager, Service { private static final Object NIL = new Object(); + protected static interface GetterFunc { + + public R apply(String hash, String key, Type type); + } + + protected static interface SetterSyncFunc { + + public void apply(String hash, String key, Type type, CacheValue value); + } + + protected static interface SetterAsyncFunc { + + public CompletableFuture apply(String hash, String key, Type type, CacheValue value); + } + protected class CacheAsyncEntry { private final AtomicBoolean state = new AtomicBoolean();