diff --git a/src/main/java/org/redkale/cache/support/CacheManagerService.java b/src/main/java/org/redkale/cache/support/CacheManagerService.java index 32779da68..6085f0361 100644 --- a/src/main/java/org/redkale/cache/support/CacheManagerService.java +++ b/src/main/java/org/redkale/cache/support/CacheManagerService.java @@ -5,10 +5,14 @@ package org.redkale.cache.support; import java.lang.reflect.Type; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.Component; @@ -55,8 +59,11 @@ public class CacheManagerService implements CacheManager, Service { //缓存hash集合, 用于定时遍历删除过期数据 protected final ConcurrentSkipListSet hashNames = new ConcurrentSkipListSet<>(); - //缓存无效时使用的锁 - private final ConcurrentHashMap hashLock = new ConcurrentHashMap<>(); + //缓存无效时使用的同步锁 + private final ConcurrentHashMap syncLock = new ConcurrentHashMap<>(); + + //缓存无效时使用的异步锁 + private final ConcurrentHashMap asyncLock = new ConcurrentHashMap<>(); @Resource(required = false) protected Application application; @@ -155,13 +162,13 @@ public class CacheManagerService implements CacheManager, Service { */ public T localGet(final String hash, final String key, final Type type, Duration expire, Supplier supplier) { Objects.requireNonNull(supplier); - Type t = loadCacheType(type); + final Type t = loadCacheType(type); CacheValue val = localSource.hget(hash, key, t); if (CacheValue.isValid(val)) { return val.getValue(); } - String lockKey = lockKey(hash, key); - val = hashLock.computeIfAbsent(lockKey, k -> cacheSupplier(expire, supplier).get()); + final String lockKey = lockKey(hash, key); + val = syncLock.computeIfAbsent(lockKey, k -> cacheSupplier(expire, supplier).get()); try { if (CacheValue.isValid(val)) { localSource.hset(hash, key, t, val); @@ -170,7 +177,7 @@ public class CacheManagerService implements CacheManager, Service { return null; } } finally { - hashLock.remove(lockKey); + syncLock.remove(lockKey); } } @@ -188,7 +195,36 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public CompletableFuture localGetAsync(String hash, String key, Type type, Duration expire, Supplier> supplier) { - throw new UnsupportedOperationException("Not supported yet."); + Objects.requireNonNull(supplier); + final Type t = loadCacheType(type); + CacheValue val = localSource.hget(hash, key, t); + if (CacheValue.isValid(val)) { + return CompletableFuture.completedFuture(val.getValue()); + } + final String lockKey = lockKey(hash, key); + final CacheWaitEntry entry = asyncLock.computeIfAbsent(lockKey, k -> new CacheWaitEntry()); + CompletableFuture future = new CompletableFuture<>(); + if (entry.compare(future)) { + try { + supplier.get().whenComplete((v, e) -> { + if (e != null) { + asyncLock.remove(lockKey); + entry.fail(e); + } + CacheValue rs = cacheFunc(expire, v); + if (CacheValue.isValid(val)) { + localSource.hset(hash, key, t, val); + } + asyncLock.remove(lockKey); + entry.success(CacheValue.get(rs)); + }); + } catch (Throwable e) { + asyncLock.remove(lockKey); + entry.fail(e); + return CompletableFuture.failedFuture(e); + } + } + return future; } /** @@ -202,7 +238,10 @@ public class CacheManagerService implements CacheManager, Service { * @param expire 过期时长,为null表示永不过期 */ @Override - public void localSet(String hash, String key, Type type, T value, Duration expire) { + public void localSet(String hash, String key, + Type type, T value, + Duration expire + ) { Type t = loadCacheType(type, value); CacheValue val = CacheValue.create(value, expire); localSource.hset(hash, key, t, val); @@ -217,7 +256,8 @@ public class CacheManagerService implements CacheManager, Service { * @return 删除数量 */ @Override - public long localDel(String hash, String key) { + public long localDel(String hash, String key + ) { return localSource.hdel(hash, key); } @@ -233,7 +273,9 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ @Override - public T remoteGet(final String hash, final String key, final Type type) { + public T remoteGet( + final String hash, final String key, final Type type + ) { Type t = loadCacheType(type); CacheValue val = remoteSource.hget(hash, key, t); return CacheValue.get(val); @@ -250,7 +292,10 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ @Override - public CompletableFuture remoteGetAsync(final String hash, final String key, final Type type) { + public CompletableFuture + remoteGetAsync( + final String hash, final String key, final Type type + ) { Type t = loadCacheType(type); CompletableFuture> future = remoteSource.hgetAsync(hash, key, t); return future.thenApply(CacheValue::get); @@ -497,4 +542,61 @@ public class CacheManagerService implements CacheManager, Service { return cacheValueTypes.computeIfAbsent(type, t -> TypeToken.createParameterizedType(null, CacheValue.class, type)); } + protected static class CacheWaitEntry { + + private static final Object NIL = new Object(); + + private final AtomicBoolean state = new AtomicBoolean(); + + private final List futures = new ArrayList<>(); + + private final ReentrantLock lock = new ReentrantLock(); + + private Object resultObj = NIL; + + private Throwable resultExp; + + public boolean compare(CompletableFuture future) { + lock.lock(); + try { + if (resultObj != NIL) { + future.complete(resultObj); + return false; + } else if (resultExp != null) { + future.completeExceptionally(resultExp); + return false; + } + boolean rs = state.compareAndSet(false, true); + this.futures.add(future); + return rs; + } finally { + lock.unlock(); + } + } + + public void fail(Throwable t) { + lock.lock(); + try { + this.resultExp = t; + for (CompletableFuture future : futures) { + future.completeExceptionally(t); + } + } finally { + lock.unlock(); + } + } + + public void success(T val) { + lock.lock(); + try { + this.resultObj = val; + for (CompletableFuture future : futures) { + future.complete(val); + } + } finally { + lock.unlock(); + } + } + + } }