CacheManager

This commit is contained in:
redkale
2023-12-12 00:31:43 +08:00
parent 618a42c5df
commit 8698d12d1f

View File

@@ -5,10 +5,14 @@ package org.redkale.cache.support;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component;
@@ -55,8 +59,11 @@ public class CacheManagerService implements CacheManager, Service {
//缓存hash集合, 用于定时遍历删除过期数据
protected final ConcurrentSkipListSet<String> hashNames = new ConcurrentSkipListSet<>();
//缓存无效时使用的锁
private final ConcurrentHashMap<String, CacheValue> hashLock = new ConcurrentHashMap<>();
//缓存无效时使用的同步
private final ConcurrentHashMap<String, CacheValue> syncLock = new ConcurrentHashMap<>();
//缓存无效时使用的异步锁
private final ConcurrentHashMap<String, CacheWaitEntry> asyncLock = new ConcurrentHashMap<>();
@Resource(required = false)
protected Application application;
@@ -155,13 +162,13 @@ public class CacheManagerService implements CacheManager, Service {
*/
public <T> T localGet(final String hash, final String key, final Type type, Duration expire, Supplier<T> supplier) {
Objects.requireNonNull(supplier);
Type t = loadCacheType(type);
final Type t = loadCacheType(type);
CacheValue<T> val = localSource.hget(hash, key, t);
if (CacheValue.isValid(val)) {
return val.getValue();
}
String lockKey = lockKey(hash, key);
val = hashLock.computeIfAbsent(lockKey, k -> cacheSupplier(expire, supplier).get());
final String lockKey = lockKey(hash, key);
val = syncLock.computeIfAbsent(lockKey, k -> cacheSupplier(expire, supplier).get());
try {
if (CacheValue.isValid(val)) {
localSource.hset(hash, key, t, val);
@@ -170,7 +177,7 @@ public class CacheManagerService implements CacheManager, Service {
return null;
}
} finally {
hashLock.remove(lockKey);
syncLock.remove(lockKey);
}
}
@@ -188,7 +195,36 @@ public class CacheManagerService implements CacheManager, Service {
*/
@Override
public <T> CompletableFuture<T> localGetAsync(String hash, String key, Type type, Duration expire, Supplier<CompletableFuture<T>> supplier) {
throw new UnsupportedOperationException("Not supported yet.");
Objects.requireNonNull(supplier);
final Type t = loadCacheType(type);
CacheValue<T> val = localSource.hget(hash, key, t);
if (CacheValue.isValid(val)) {
return CompletableFuture.completedFuture(val.getValue());
}
final String lockKey = lockKey(hash, key);
final CacheWaitEntry entry = asyncLock.computeIfAbsent(lockKey, k -> new CacheWaitEntry());
CompletableFuture<T> future = new CompletableFuture<>();
if (entry.compare(future)) {
try {
supplier.get().whenComplete((v, e) -> {
if (e != null) {
asyncLock.remove(lockKey);
entry.fail(e);
}
CacheValue<T> rs = cacheFunc(expire, v);
if (CacheValue.isValid(val)) {
localSource.hset(hash, key, t, val);
}
asyncLock.remove(lockKey);
entry.success(CacheValue.get(rs));
});
} catch (Throwable e) {
asyncLock.remove(lockKey);
entry.fail(e);
return CompletableFuture.failedFuture(e);
}
}
return future;
}
/**
@@ -202,7 +238,10 @@ public class CacheManagerService implements CacheManager, Service {
* @param expire 过期时长为null表示永不过期
*/
@Override
public <T> void localSet(String hash, String key, Type type, T value, Duration expire) {
public <T> void localSet(String hash, String key,
Type type, T value,
Duration expire
) {
Type t = loadCacheType(type, value);
CacheValue val = CacheValue.create(value, expire);
localSource.hset(hash, key, t, val);
@@ -217,7 +256,8 @@ public class CacheManagerService implements CacheManager, Service {
* @return 删除数量
*/
@Override
public long localDel(String hash, String key) {
public long localDel(String hash, String key
) {
return localSource.hdel(hash, key);
}
@@ -233,7 +273,9 @@ public class CacheManagerService implements CacheManager, Service {
* @return 数据值
*/
@Override
public <T> T remoteGet(final String hash, final String key, final Type type) {
public <T> T remoteGet(
final String hash, final String key, final Type type
) {
Type t = loadCacheType(type);
CacheValue<T> val = remoteSource.hget(hash, key, t);
return CacheValue.get(val);
@@ -250,7 +292,10 @@ public class CacheManagerService implements CacheManager, Service {
* @return 数据值
*/
@Override
public <T> CompletableFuture<T> remoteGetAsync(final String hash, final String key, final Type type) {
public <T> CompletableFuture<T>
remoteGetAsync(
final String hash, final String key, final Type type
) {
Type t = loadCacheType(type);
CompletableFuture<CacheValue<T>> future = remoteSource.hgetAsync(hash, key, t);
return future.thenApply(CacheValue::get);
@@ -497,4 +542,61 @@ public class CacheManagerService implements CacheManager, Service {
return cacheValueTypes.computeIfAbsent(type, t -> TypeToken.createParameterizedType(null, CacheValue.class, type));
}
protected static class CacheWaitEntry {
private static final Object NIL = new Object();
private final AtomicBoolean state = new AtomicBoolean();
private final List<CompletableFuture> futures = new ArrayList<>();
private final ReentrantLock lock = new ReentrantLock();
private Object resultObj = NIL;
private Throwable resultExp;
public boolean compare(CompletableFuture future) {
lock.lock();
try {
if (resultObj != NIL) {
future.complete(resultObj);
return false;
} else if (resultExp != null) {
future.completeExceptionally(resultExp);
return false;
}
boolean rs = state.compareAndSet(false, true);
this.futures.add(future);
return rs;
} finally {
lock.unlock();
}
}
public void fail(Throwable t) {
lock.lock();
try {
this.resultExp = t;
for (CompletableFuture future : futures) {
future.completeExceptionally(t);
}
} finally {
lock.unlock();
}
}
public <T> void success(T val) {
lock.lock();
try {
this.resultObj = val;
for (CompletableFuture future : futures) {
future.complete(val);
}
} finally {
lock.unlock();
}
}
}
}