From 08d667d4d1a3b4a37568da095729a1816e005dd9 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 12 Dec 2023 14:44:43 +0800 Subject: [PATCH] =?UTF-8?q?CacheManager=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/cache/CacheManager.java | 30 ++- .../redkale/cache/support/CacheAction.java | 8 + .../cache/support/CacheManagerService.java | 210 ++++++++++++------ .../org/redkale/persistence/Cacheable.java | 64 ------ .../java/org/redkale/source/EntityInfo.java | 14 -- .../org/redkale/test/cache/CachingTest.java | 19 +- 6 files changed, 189 insertions(+), 156 deletions(-) delete mode 100644 src/main/java/org/redkale/persistence/Cacheable.java diff --git a/src/main/java/org/redkale/cache/CacheManager.java b/src/main/java/org/redkale/cache/CacheManager.java index e8f2fd904..386723560 100644 --- a/src/main/java/org/redkale/cache/CacheManager.java +++ b/src/main/java/org/redkale/cache/CacheManager.java @@ -60,7 +60,7 @@ public interface CacheManager { public T localGet(final String hash, final String key, final Type type, Duration expire, Supplier supplier); /** - * 远程异步获取缓存数据, 过期返回null + * 本地异步获取缓存数据, 过期返回null * * @param 泛型 * @param hash 缓存hash @@ -156,6 +156,34 @@ public interface CacheManager { return remoteGetAsync(hash, key, String.class); } + /** + * 远程获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param expire 过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + public T remoteGet(final String hash, final String key, final Type type, Duration expire, Supplier supplier); + + /** + * 远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param expire 过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + public CompletableFuture remoteGetAsync(String hash, String key, Type type, Duration expire, Supplier> supplier); + /** * 远程缓存数据 * diff --git a/src/main/java/org/redkale/cache/support/CacheAction.java b/src/main/java/org/redkale/cache/support/CacheAction.java index 6381120f2..4a3f4ffdc 100644 --- a/src/main/java/org/redkale/cache/support/CacheAction.java +++ b/src/main/java/org/redkale/cache/support/CacheAction.java @@ -4,6 +4,7 @@ package org.redkale.cache.support; import java.lang.reflect.Method; +import java.util.concurrent.ConcurrentHashMap; import org.redkale.convert.json.JsonConvert; /** @@ -24,4 +25,11 @@ public class CacheAction { public String toString() { return JsonConvert.root().convertTo(this); } + + public static void main(String[] args) throws Throwable { + final ConcurrentHashMap asyncLock = new ConcurrentHashMap<>(); + String val = asyncLock.computeIfAbsent("aaa", t -> null); + System.out.println(asyncLock.size()); + System.out.println(val); + } } diff --git a/src/main/java/org/redkale/cache/support/CacheManagerService.java b/src/main/java/org/redkale/cache/support/CacheManagerService.java index 6085f0361..fa51630a8 100644 --- a/src/main/java/org/redkale/cache/support/CacheManagerService.java +++ b/src/main/java/org/redkale/cache/support/CacheManagerService.java @@ -13,6 +13,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.function.Supplier; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.Component; @@ -63,7 +64,7 @@ public class CacheManagerService implements CacheManager, Service { private final ConcurrentHashMap syncLock = new ConcurrentHashMap<>(); //缓存无效时使用的异步锁 - private final ConcurrentHashMap asyncLock = new ConcurrentHashMap<>(); + private final ConcurrentHashMap asyncLock = new ConcurrentHashMap<>(); @Resource(required = false) protected Application application; @@ -161,28 +162,11 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ public T localGet(final String hash, final String key, final Type type, Duration expire, Supplier supplier) { - Objects.requireNonNull(supplier); - final Type t = loadCacheType(type); - CacheValue val = localSource.hget(hash, key, t); - if (CacheValue.isValid(val)) { - return val.getValue(); - } - final String lockKey = lockKey(hash, key); - val = syncLock.computeIfAbsent(lockKey, k -> cacheSupplier(expire, supplier).get()); - try { - if (CacheValue.isValid(val)) { - localSource.hset(hash, key, t, val); - return val.getValue(); - } else { - return null; - } - } finally { - syncLock.remove(lockKey); - } + return get(localSource, hash, key, type, expire, supplier); } /** - * 远程异步获取缓存数据, 过期返回null + * 本地异步获取缓存数据, 过期返回null * * @param 泛型 * @param hash 缓存hash @@ -195,36 +179,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public CompletableFuture localGetAsync(String hash, String key, Type type, Duration expire, Supplier> supplier) { - Objects.requireNonNull(supplier); - final Type t = loadCacheType(type); - CacheValue val = localSource.hget(hash, key, t); - if (CacheValue.isValid(val)) { - return CompletableFuture.completedFuture(val.getValue()); - } - final String lockKey = lockKey(hash, key); - final CacheWaitEntry entry = asyncLock.computeIfAbsent(lockKey, k -> new CacheWaitEntry()); - CompletableFuture future = new CompletableFuture<>(); - if (entry.compare(future)) { - try { - supplier.get().whenComplete((v, e) -> { - if (e != null) { - asyncLock.remove(lockKey); - entry.fail(e); - } - CacheValue rs = cacheFunc(expire, v); - if (CacheValue.isValid(val)) { - localSource.hset(hash, key, t, val); - } - asyncLock.remove(lockKey); - entry.success(CacheValue.get(rs)); - }); - } catch (Throwable e) { - asyncLock.remove(lockKey); - entry.fail(e); - return CompletableFuture.failedFuture(e); - } - } - return future; + return get(localSource, hash, key, type, expire, supplier); } /** @@ -238,10 +193,7 @@ public class CacheManagerService implements CacheManager, Service { * @param expire 过期时长,为null表示永不过期 */ @Override - public void localSet(String hash, String key, - Type type, T value, - Duration expire - ) { + public void localSet(String hash, String key, Type type, T value, Duration expire) { Type t = loadCacheType(type, value); CacheValue val = CacheValue.create(value, expire); localSource.hset(hash, key, t, val); @@ -256,8 +208,7 @@ public class CacheManagerService implements CacheManager, Service { * @return 删除数量 */ @Override - public long localDel(String hash, String key - ) { + public long localDel(String hash, String key) { return localSource.hdel(hash, key); } @@ -273,9 +224,7 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ @Override - public T remoteGet( - final String hash, final String key, final Type type - ) { + public T remoteGet(final String hash, final String key, final Type type) { Type t = loadCacheType(type); CacheValue val = remoteSource.hget(hash, key, t); return CacheValue.get(val); @@ -292,15 +241,44 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ @Override - public CompletableFuture - remoteGetAsync( - final String hash, final String key, final Type type - ) { + public CompletableFuture remoteGetAsync(final String hash, final String key, final Type type) { Type t = loadCacheType(type); CompletableFuture> future = remoteSource.hgetAsync(hash, key, t); return future.thenApply(CacheValue::get); } + /** + * 远程获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param expire 过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + public T remoteGet(final String hash, final String key, final Type type, Duration expire, Supplier supplier) { + return get(remoteSource, hash, key, type, expire, supplier); + } + + /** + * 远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param expire 过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + public CompletableFuture remoteGetAsync(String hash, String key, Type type, Duration expire, Supplier> supplier) { + return getAsync(remoteSource, hash, key, type, expire, supplier); + } + /** * 远程缓存数据 * @@ -480,6 +458,88 @@ public class CacheManagerService implements CacheManager, Service { } //-------------------------------------- 内部方法 -------------------------------------- + /** + * 获取缓存数据, 过期返回null + * + * @param 泛型 + * @param source 缓存源 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param expire 过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + protected T get(CacheSource source, String hash, String key, Type type, Duration expire, Supplier supplier) { + Objects.requireNonNull(supplier); + final Type t = loadCacheType(type); + CacheValue val = source.hget(hash, key, t); + if (CacheValue.isValid(val)) { + return val.getValue(); + } + Function func = k -> { + CacheValue oldVal = source.hget(hash, key, t); + if (CacheValue.isValid(oldVal)) { + return oldVal; + } + CacheValue newVal = toCacheSupplier(expire, supplier).get(); + if (CacheValue.isValid(newVal)) { + source.hset(hash, key, t, newVal); + } + return newVal; + }; + final String lockId = lockId(hash, key); + val = syncLock.computeIfAbsent(lockId, func); + try { + return CacheValue.get(val); + } finally { + syncLock.remove(lockId); + } + } + + /** + * 异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param source 缓存源 + * @param hash 缓存hash + * @param key 缓存键 + * @param type 数据类型 + * @param expire 过期时长,为null表示永不过期 + * @param supplier 数据函数 + * + * @return 数据值 + */ + protected CompletableFuture getAsync(CacheSource source, String hash, String key, Type type, Duration expire, Supplier> supplier) { + Objects.requireNonNull(supplier); + final Type t = loadCacheType(type); + CacheValue val = source.hget(hash, key, t); + if (CacheValue.isValid(val)) { + return CompletableFuture.completedFuture(val.getValue()); + } + final String lockId = lockId(hash, key); + final CacheAsyncEntry entry = asyncLock.computeIfAbsent(lockId, CacheAsyncEntry::new); + CompletableFuture future = new CompletableFuture<>(); + if (entry.compareAddFuture(future)) { + try { + supplier.get().whenComplete((v, e) -> { + if (e != null) { + entry.fail(e); + } + CacheValue rs = toCacheValue(expire, v); + if (CacheValue.isValid(val)) { + source.hset(hash, key, t, val); + } + entry.success(CacheValue.get(rs)); + }); + } catch (Throwable e) { + entry.fail(e); + } + } + return future; + } + /** * 创建一个锁key * @@ -488,7 +548,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return */ - protected String lockKey(String hash, String key) { + protected String lockId(String hash, String key) { return hash + (char) 8 + key; } @@ -500,7 +560,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return CacheValue函数 */ - protected CacheValue cacheFunc(Duration expire, T value) { + protected CacheValue toCacheValue(Duration expire, T value) { if (value == null) { return nullable ? CacheValue.create(value, expire) : null; } @@ -515,8 +575,8 @@ public class CacheManagerService implements CacheManager, Service { * * @return CacheValue函数 */ - protected Supplier> cacheSupplier(Duration expire, Supplier supplier) { - return () -> cacheFunc(expire, supplier.get()); + protected Supplier> toCacheSupplier(Duration expire, Supplier supplier) { + return () -> toCacheValue(expire, supplier.get()); } /** @@ -542,9 +602,9 @@ public class CacheManagerService implements CacheManager, Service { return cacheValueTypes.computeIfAbsent(type, t -> TypeToken.createParameterizedType(null, CacheValue.class, type)); } - protected static class CacheWaitEntry { + private static final Object NIL = new Object(); - private static final Object NIL = new Object(); + protected class CacheAsyncEntry { private final AtomicBoolean state = new AtomicBoolean(); @@ -552,11 +612,17 @@ public class CacheManagerService implements CacheManager, Service { private final ReentrantLock lock = new ReentrantLock(); + private final String lockId; + private Object resultObj = NIL; private Throwable resultExp; - public boolean compare(CompletableFuture future) { + public CacheAsyncEntry(String lockId) { + this.lockId = lockId; + } + + public boolean compareAddFuture(CompletableFuture future) { lock.lock(); try { if (resultObj != NIL) { @@ -581,7 +647,9 @@ public class CacheManagerService implements CacheManager, Service { for (CompletableFuture future : futures) { future.completeExceptionally(t); } + this.futures.clear(); } finally { + asyncLock.remove(lockId); lock.unlock(); } } @@ -593,7 +661,9 @@ public class CacheManagerService implements CacheManager, Service { for (CompletableFuture future : futures) { future.complete(val); } + this.futures.clear(); } finally { + asyncLock.remove(lockId); lock.unlock(); } } diff --git a/src/main/java/org/redkale/persistence/Cacheable.java b/src/main/java/org/redkale/persistence/Cacheable.java deleted file mode 100644 index da678cfe1..000000000 --- a/src/main/java/org/redkale/persistence/Cacheable.java +++ /dev/null @@ -1,64 +0,0 @@ -/** ***************************************************************************** - * Copyright (c) 2008 - 2013 Oracle Corporation. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 and Eclipse Distribution License v. 1.0 - * which accompanies this distribution. - * The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Linda DeMichiel - Java Persistence 2.1 - * Linda DeMichiel - Java Persistence 2.0 - * - ***************************************************************************** */ -package org.redkale.persistence; - -import java.lang.annotation.*; -import static java.lang.annotation.ElementType.TYPE; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -/** - * Specifies whether an entity should be cached if caching is enabled - * when the value of the persistence.xml caching element - * is ENABLE_SELECTIVE or DISABLE_SELECTIVE. - * The value of the Cacheable annotation is inherited by - * subclasses; it can be overridden by specifying - * Cacheable on a subclass. - * - *

- * Cacheable(false) means that the entity and its state must - * not be cached by the provider. - * - * @deprecated replace by {@link org.redkale.persistence.Entity#cacheable() } - * - * @since Java Persistence 2.0 - */ -@Deprecated(since = "2.8.0") -@Target({TYPE}) -@Retention(RUNTIME) -public @interface Cacheable { - - /** - * (Optional) Whether or not the entity should be cached. - * - * @return boolean - */ - boolean value() default true; - - /** - * (Optional) 定时自动更新缓存的周期秒数,为0表示不做定时更新, 大于0表示每经过interval秒后会自动从数据库中拉取数据更新Cache - * - * @return int - */ - int interval() default 0; - - /** - * (Optional) DataSource是否直接返回对象的真实引用, 而不是copy一份 - * - * @return boolean - */ - boolean direct() default false; - -} diff --git a/src/main/java/org/redkale/source/EntityInfo.java b/src/main/java/org/redkale/source/EntityInfo.java index bf9d4ee11..c0e1ce3df 100644 --- a/src/main/java/org/redkale/source/EntityInfo.java +++ b/src/main/java/org/redkale/source/EntityInfo.java @@ -702,20 +702,6 @@ public final class EntityInfo { direct = ve.direct(); } } - { //兼容旧类 - org.redkale.persistence.Cacheable c1 = type.getAnnotation(org.redkale.persistence.Cacheable.class); - if (c1 != null) { - cacheable = c1.value(); - interval = c1.interval(); - direct = c1.direct(); - } - javax.persistence.Cacheable c2 = type.getAnnotation(javax.persistence.Cacheable.class); - if (c2 != null) { - cacheable = c2.value(); - interval = c2.interval(); - direct = c2.direct(); - } - } if (this.table == null || (!cacheForbidden && cacheable)) { this.cache = new EntityCache<>(this, interval, direct); } else { diff --git a/src/test/java/org/redkale/test/cache/CachingTest.java b/src/test/java/org/redkale/test/cache/CachingTest.java index 0fb5789d5..b8903ebc4 100644 --- a/src/test/java/org/redkale/test/cache/CachingTest.java +++ b/src/test/java/org/redkale/test/cache/CachingTest.java @@ -4,30 +4,35 @@ package org.redkale.test.cache; import java.time.Duration; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.redkale.cache.support.CacheManagerService; import org.redkale.convert.json.JsonConvert; import org.redkale.source.CacheMemorySource; import org.redkale.util.Utility; - /** * * @author zhangjx */ public class CachingTest { + private static CacheManagerService manager; + public static void main(String[] args) throws Throwable { CachingTest test = new CachingTest(); + test.wait(); test.run(); } + @BeforeAll + public static void init() throws Exception { + CacheMemorySource remoteSource = new CacheMemorySource("remote"); + remoteSource.init(null); + manager = CacheManagerService.create(remoteSource); + manager.init(null); + } + @Test public void run() throws Exception { - CacheMemorySource remoteSource = new CacheMemorySource("remote"); - remoteSource.init(null); - CacheManagerService manager = CacheManagerService.create(remoteSource); - manager.init(null); Duration expire = Duration.ofMillis(490); manager.localSetString("user", "name:haha", "myha", expire); Assertions.assertEquals(manager.localGetString("user", "name:haha"), "myha");