CacheManager优化

This commit is contained in:
redkale
2023-12-27 14:40:05 +08:00
parent fb321c58a7
commit 83dd4da4ec
4 changed files with 151 additions and 149 deletions

View File

@@ -1,44 +0,0 @@
/*
*
*/
package org.redkale.cache.spi;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.ConvertDisabled;
import org.redkale.convert.json.JsonConvert;
/**
*
* 缓存过期对象
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public class CacheExpire {
//为0表示不过期
@ConvertColumn(index = 1)
protected long time;
@ConvertDisabled
public boolean isExpired() {
return time > 0 && System.currentTimeMillis() > time;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@@ -138,9 +138,7 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public <T> T localGet(final String hash, final String key, final Type type) {
checkEnable();
Type cacheType = loadCacheType(type);
CacheValue<T> cacheVal = localSource.hget(hash, key, cacheType);
return CacheValue.get(cacheVal);
return CacheValue.get(localSource.get(idFor(hash, key), loadCacheType(type)));
}
/**
@@ -158,7 +156,7 @@ public class CacheManagerService implements CacheManager, Service {
*/
@Override
public <T> T localGetSet(final String hash, final String key, final Type type, boolean nullable, Duration expire, ThrowSupplier<T> supplier) {
return getSet(localSource::hget, localSource::hset, hash, key, type, nullable, expire, supplier);
return getSet(localSource::get, this::localSetCache, hash, key, type, nullable, expire, supplier);
}
/**
@@ -176,7 +174,7 @@ public class CacheManagerService implements CacheManager, Service {
*/
@Override
public <T> CompletableFuture<T> localGetSetAsync(String hash, String key, Type type, boolean nullable, Duration expire, ThrowSupplier<CompletableFuture<T>> supplier) {
return getSetAsync(localSource::hgetAsync, localSource::hsetAsync, hash, key, type, nullable, expire, supplier);
return getSetAsync(localSource::getAsync, this::localSetCacheAsync, hash, key, type, nullable, expire, supplier);
}
/**
@@ -191,11 +189,7 @@ public class CacheManagerService implements CacheManager, Service {
*/
@Override
public <T> void localSet(String hash, String key, Type type, T value, Duration expire) {
checkEnable();
Objects.requireNonNull(expire);
Type cacheType = loadCacheType(type, value);
CacheValue cacheVal = CacheValue.create(value, expire);
localSource.hset(hash, key, cacheType, cacheVal);
setCache(localSource, hash, key, type, value, expire);
}
/**
@@ -209,7 +203,7 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public long localDel(String hash, String key) {
checkEnable();
return localSource.hdel(hash, key);
return localSource.del(idFor(hash, key));
}
//-------------------------------------- 远程缓存 --------------------------------------
@@ -226,9 +220,7 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public <T> T remoteGet(final String hash, final String key, final Type type) {
checkEnable();
Type cacheType = loadCacheType(type);
CacheValue<T> cacheVal = remoteSource.hget(hash, key, cacheType);
return CacheValue.get(cacheVal);
return CacheValue.get(remoteSource.get(idFor(hash, key), loadCacheType(type)));
}
/**
@@ -244,8 +236,7 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public <T> CompletableFuture<T> remoteGetAsync(final String hash, final String key, final Type type) {
checkEnable();
Type cacheType = loadCacheType(type);
CompletableFuture<CacheValue<T>> future = remoteSource.hgetAsync(hash, key, cacheType);
CompletableFuture<CacheValue<T>> future = remoteSource.getAsync(idFor(hash, key), loadCacheType(type));
return future.thenApply(CacheValue::get);
}
@@ -264,7 +255,7 @@ public class CacheManagerService implements CacheManager, Service {
*/
@Override
public <T> T remoteGetSet(final String hash, final String key, final Type type, boolean nullable, Duration expire, ThrowSupplier<T> supplier) {
return getSet(remoteSource::hget, remoteSource::hset, hash, key, type, nullable, expire, supplier);
return getSet(remoteSource::get, this::remoteSetCache, hash, key, type, nullable, expire, supplier);
}
/**
@@ -282,7 +273,7 @@ public class CacheManagerService implements CacheManager, Service {
*/
@Override
public <T> CompletableFuture<T> remoteGetSetAsync(String hash, String key, Type type, boolean nullable, Duration expire, ThrowSupplier<CompletableFuture<T>> supplier) {
return getSetAsync(remoteSource::hgetAsync, remoteSource::hsetAsync, hash, key, type, nullable, expire, supplier);
return getSetAsync(remoteSource::getAsync, this::remoteSetCacheAsync, hash, key, type, nullable, expire, supplier);
}
/**
@@ -297,11 +288,7 @@ public class CacheManagerService implements CacheManager, Service {
*/
@Override
public <T> void remoteSet(final String hash, final String key, final Type type, final T value, Duration expire) {
checkEnable();
Objects.requireNonNull(expire);
Type cacheType = loadCacheType(type, value);
CacheValue cacheVal = CacheValue.create(value, expire);
remoteSource.hset(hash, key, cacheType, cacheVal);
setCache(remoteSource, hash, key, type, value, expire);
}
/**
@@ -316,11 +303,7 @@ public class CacheManagerService implements CacheManager, Service {
*/
@Override
public <T> CompletableFuture<Void> remoteSetAsync(String hash, String key, Type type, T value, Duration expire) {
checkEnable();
Objects.requireNonNull(expire);
Type cacheType = loadCacheType(type, value);
CacheValue cacheVal = CacheValue.create(value, expire);
return remoteSource.hsetAsync(hash, key, cacheType, cacheVal);
return setCacheAsync(remoteSource, hash, key, type, value, expire);
}
/**
@@ -334,7 +317,7 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public long remoteDel(String hash, String key) {
checkEnable();
return remoteSource.hdel(hash, key);
return remoteSource.del(idFor(hash, key));
}
/**
@@ -348,7 +331,7 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public CompletableFuture<Long> remoteDelAsync(String hash, String key) {
checkEnable();
return remoteSource.hdelAsync(hash, key);
return remoteSource.delAsync(idFor(hash, key));
}
//-------------------------------------- both缓存 --------------------------------------
@@ -409,17 +392,18 @@ public class CacheManagerService implements CacheManager, Service {
throw new RedkaleException(t);
}
}
if (remoteExpire == null) { //只有本地缓存
Objects.requireNonNull(localExpire);
return localGetSet(hash, key, type, nullable, localExpire, supplier);
}
if (localExpire == null) { //只有远程缓存
Objects.requireNonNull(remoteExpire);
return remoteGetSet(hash, key, type, nullable, remoteExpire, supplier);
}
if (remoteExpire == null) { //只有本地缓存
return localGetSet(hash, key, type, nullable, localExpire, supplier);
}
return getSet(this::bothGetCache, (h, k, t, v) -> {
localSource.hset(h, k, t, v);
return getSet(this::bothGetCache, (i, e, t, v) -> {
localSetCache(i, localExpire, t, v);
if (remoteSource != null) {
remoteSource.hset(h, k, t, CacheValue.create(v.getValue(), remoteExpire));
remoteSetCache(i, remoteExpire, t, v);
}
}, hash, key, type, nullable, localExpire, supplier);
}
@@ -448,17 +432,18 @@ public class CacheManagerService implements CacheManager, Service {
return CompletableFuture.failedFuture(t);
}
}
if (remoteExpire == null) { //只有本地缓存
Objects.requireNonNull(localExpire);
return localGetSetAsync(hash, key, type, nullable, localExpire, supplier);
}
if (localExpire == null) { //只有远程缓存
Objects.requireNonNull(remoteExpire);
return remoteGetSetAsync(hash, key, type, nullable, remoteExpire, supplier);
}
if (remoteExpire == null) { //只有本地缓存
return localGetSetAsync(hash, key, type, nullable, localExpire, supplier);
}
return getSetAsync(this::bothGetCacheAsync, (h, k, t, v) -> {
localSource.hset(h, k, t, v);
return getSetAsync(this::bothGetCacheAsync, (i, e, t, v) -> {
localSetCache(i, localExpire, t, v);
if (remoteSource != null) {
return remoteSource.hsetAsync(h, k, t, CacheValue.create(v.getValue(), remoteExpire));
return remoteSetCacheAsync(i, remoteExpire, t, v);
} else {
return CompletableFuture.completedFuture(null);
}
@@ -479,12 +464,11 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public <T> void bothSet(final String hash, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) {
checkEnable();
Type cacheType = loadCacheType(type, value);
if (localExpire != null) {
localSource.hset(hash, key, cacheType, CacheValue.create(value, localExpire));
setCache(localSource, hash, key, type, value, localExpire);
}
if (remoteSource != null && remoteExpire != null) {
remoteSource.hset(hash, key, cacheType, CacheValue.create(value, remoteExpire));
setCache(remoteSource, hash, key, type, value, remoteExpire);
}
}
@@ -504,12 +488,11 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public <T> CompletableFuture<Void> bothSetAsync(String hash, String key, Type type, T value, Duration localExpire, Duration remoteExpire) {
checkEnable();
Type cacheType = loadCacheType(type, value);
if (localExpire != null) {
localSource.hset(hash, key, cacheType, CacheValue.create(value, localExpire)); //内存操作,无需异步
setCache(localSource, hash, key, type, value, localExpire);
}
if (remoteSource != null && remoteExpire != null) {
return remoteSource.hsetAsync(hash, key, cacheType, CacheValue.create(value, remoteExpire));
return setCacheAsync(remoteSource, hash, key, type, value, remoteExpire);
} else {
return CompletableFuture.completedFuture(null);
}
@@ -526,9 +509,10 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public long bothDel(String hash, String key) {
checkEnable();
long v = localSource.hdel(hash, key);
String id = idFor(hash, key);
long v = localSource.del(id);
if (remoteSource != null) {
return remoteSource.hdel(hash, key);
return remoteSource.del(id);
} else {
return v;
}
@@ -545,9 +529,10 @@ public class CacheManagerService implements CacheManager, Service {
@Override
public CompletableFuture<Long> bothDelAsync(String hash, String key) {
checkEnable();
long v = localSource.hdel(hash, key); //内存操作,无需异步
String id = idFor(hash, key);
long v = localSource.del(id); //内存操作,无需异步
if (remoteSource != null) {
return remoteSource.hdelAsync(hash, key);
return remoteSource.delAsync(id);
} else {
return CompletableFuture.completedFuture(v);
}
@@ -575,34 +560,34 @@ public class CacheManagerService implements CacheManager, Service {
Objects.requireNonNull(expire);
Objects.requireNonNull(supplier);
final Type cacheType = loadCacheType(type);
CacheValue<T> cacheVal = getter.get(hash, key, type);
final String id = idFor(hash, key);
CacheValue<T> cacheVal = getter.get(id, cacheType);
if (CacheValue.isValid(cacheVal)) {
return cacheVal.getValue();
return cacheVal.getVal();
}
Function<String, CacheValue> func = k -> {
CacheValue<T> oldCacheVal = getter.get(hash, key, type);
CacheValue<T> oldCacheVal = getter.get(id, cacheType);
if (CacheValue.isValid(oldCacheVal)) {
return oldCacheVal;
}
CacheValue<T> newCacheVal;
try {
newCacheVal = toCacheSupplier(nullable, expire, supplier).get();
newCacheVal = toCacheSupplier(nullable, supplier).get();
} catch (RuntimeException e) {
throw e;
} catch (Throwable t) {
throw new RedkaleException(t);
}
if (CacheValue.isValid(newCacheVal)) {
setter.set(hash, key, cacheType, newCacheVal);
setter.set(id, expire, cacheType, newCacheVal);
}
return newCacheVal;
};
final String lockId = lockId(hash, key);
cacheVal = syncLock.computeIfAbsent(lockId, func);
cacheVal = syncLock.computeIfAbsent(id, func);
try {
return CacheValue.get(cacheVal);
} finally {
syncLock.remove(lockId);
syncLock.remove(id);
}
}
@@ -626,13 +611,13 @@ public class CacheManagerService implements CacheManager, Service {
checkEnable();
Objects.requireNonNull(supplier);
final Type cacheType = loadCacheType(type);
CompletableFuture<CacheValue<T>> sourceFuture = getter.get(hash, key, type);
final String id = idFor(hash, key);
CompletableFuture<CacheValue<T>> sourceFuture = getter.get(id, cacheType);
return sourceFuture.thenCompose(val -> {
if (CacheValue.isValid(val)) {
return CompletableFuture.completedFuture(val.getValue());
return CompletableFuture.completedFuture(val.getVal());
}
final String lockId = lockId(hash, key);
final CacheAsyncEntry entry = asyncLock.computeIfAbsent(lockId, CacheAsyncEntry::new);
final CacheAsyncEntry entry = asyncLock.computeIfAbsent(id, CacheAsyncEntry::new);
CompletableFuture<T> future = new CompletableFuture<>();
if (entry.compareAddFuture(future)) {
try {
@@ -640,9 +625,9 @@ public class CacheManagerService implements CacheManager, Service {
if (e != null) {
entry.fail(e);
}
CacheValue<T> cacheVal = toCacheValue(nullable, expire, v);
CacheValue<T> cacheVal = toCacheValue(nullable, v);
if (CacheValue.isValid(cacheVal)) {
setter.set(hash, key, cacheType, cacheVal)
setter.set(id, expire, cacheType, cacheVal)
.whenComplete((v2, e2) -> entry.success(CacheValue.get(cacheVal)));
} else {
entry.success(CacheValue.get(cacheVal));
@@ -656,20 +641,56 @@ public class CacheManagerService implements CacheManager, Service {
});
}
protected <T> CacheValue<T> bothGetCache(final String hash, final String key, final Type type) {
protected <T> CompletableFuture<Void> localSetCacheAsync(String id, Duration expire, Type cacheType, CacheValue<T> cacheVal) {
return setCacheAsync(localSource, id, expire, cacheType, cacheVal);
}
protected <T> CompletableFuture<Void> remoteSetCacheAsync(String id, Duration expire, Type cacheType, CacheValue<T> cacheVal) {
return setCacheAsync(remoteSource, id, expire, cacheType, cacheVal);
}
protected <T> void localSetCache(String id, Duration expire, Type cacheType, CacheValue<T> cacheVal) {
setCache(localSource, id, expire, cacheType, cacheVal);
}
protected <T> void remoteSetCache(String id, Duration expire, Type cacheType, CacheValue<T> cacheVal) {
setCache(remoteSource, id, expire, cacheType, cacheVal);
}
protected <T> void setCache(CacheSource source, String id, Duration expire, Type cacheType, CacheValue<T> cacheVal) {
checkEnable();
Type cacheType = loadCacheType(type);
CacheValue<T> cacheVal = localSource.hget(hash, key, cacheType);
if (CacheValue.isValid(cacheVal)) {
return cacheVal;
}
if (remoteSource != null) {
return remoteSource.hget(hash, key, cacheType);
Objects.requireNonNull(expire);
long millis = expire.toMillis();
if (millis > 0) {
source.psetex(id, millis, cacheType, cacheVal);
} else {
return null;
source.set(id, cacheType, cacheVal);
}
}
protected <T> CompletableFuture<Void> setCacheAsync(CacheSource source, String id, Duration expire, Type cacheType, CacheValue<T> cacheVal) {
checkEnable();
Objects.requireNonNull(expire);
long millis = expire.toMillis();
if (millis > 0) {
return source.psetexAsync(id, millis, cacheType, cacheVal);
} else {
return source.setAsync(id, cacheType, cacheVal);
}
}
protected <T> void setCache(CacheSource source, String hash, String key, Type type, T value, Duration expire) {
setCache(source, idFor(hash, key), expire, loadCacheType(type, value), CacheValue.create(value));
}
protected <T> CompletableFuture<Void> setCacheAsync(CacheSource source, String hash, String key, Type type, T value, Duration expire) {
return setCacheAsync(source, idFor(hash, key), expire, loadCacheType(type, value), CacheValue.create(value));
}
protected <T> CacheValue<T> bothGetCache(final String hash, final String key, final Type type) {
return bothGetCache(idFor(hash, key), loadCacheType(type));
}
/**
* 远程异步获取缓存数据, 过期返回null
*
@@ -681,14 +702,39 @@ public class CacheManagerService implements CacheManager, Service {
* @return 数据值
*/
protected <T> CompletableFuture<CacheValue<T>> bothGetCacheAsync(final String hash, final String key, final Type type) {
return bothGetCacheAsync(idFor(hash, key), loadCacheType(type));
}
protected <T> CacheValue<T> bothGetCache(final String id, final Type cacheType) {
checkEnable();
Type cacheType = loadCacheType(type);
CacheValue<T> val = localSource.hget(hash, key, cacheType); //内存操作,无需异步
CacheValue<T> cacheVal = localSource.get(id, cacheType);
if (CacheValue.isValid(cacheVal)) {
return cacheVal;
}
if (remoteSource != null) {
return remoteSource.get(id, cacheType);
} else {
return null;
}
}
/**
* 远程异步获取缓存数据, 过期返回null
*
* @param <T> 泛型
* @param id 缓存键
* @param cacheType 数据类型
*
* @return 数据值
*/
protected <T> CompletableFuture<CacheValue<T>> bothGetCacheAsync(final String id, final Type cacheType) {
checkEnable();
CacheValue<T> val = localSource.get(id, cacheType); //内存操作,无需异步
if (CacheValue.isValid(val)) {
return CompletableFuture.completedFuture(val);
}
if (remoteSource != null) {
return remoteSource.hgetAsync(hash, key, cacheType);
return remoteSource.getAsync(id, cacheType);
} else {
return CompletableFuture.completedFuture(null);
}
@@ -708,7 +754,7 @@ public class CacheManagerService implements CacheManager, Service {
*
* @return key
*/
protected String lockId(String hash, String key) {
protected String idFor(String hash, String key) {
return hash + ':' + key;
}
@@ -717,16 +763,15 @@ public class CacheManagerService implements CacheManager, Service {
*
* @param <T> 泛型
* @param nullable 是否缓存null值
* @param expire 过期时长Duration.ZERO为永不过期
* @param value 缓存值
*
* @return CacheValue函数
*/
protected <T> CacheValue<T> toCacheValue(boolean nullable, Duration expire, T value) {
protected <T> CacheValue<T> toCacheValue(boolean nullable, T value) {
if (value == null) {
return nullable ? CacheValue.create(value, expire) : null;
return nullable ? CacheValue.create(value) : null;
}
return CacheValue.create(value, expire);
return CacheValue.create(value);
}
/**
@@ -734,13 +779,12 @@ public class CacheManagerService implements CacheManager, Service {
*
* @param <T> 泛型
* @param nullable 是否缓存null值
* @param expire 过期时长Duration.ZERO为永不过期
* @param supplier 数据函数
*
* @return CacheValue函数
*/
protected <T> ThrowSupplier<CacheValue<T>> toCacheSupplier(boolean nullable, Duration expire, ThrowSupplier<T> supplier) {
return () -> toCacheValue(nullable, expire, supplier.get());
protected <T> ThrowSupplier<CacheValue<T>> toCacheSupplier(boolean nullable, ThrowSupplier<T> supplier) {
return () -> toCacheValue(nullable, supplier.get());
}
/**
@@ -770,17 +814,17 @@ public class CacheManagerService implements CacheManager, Service {
protected static interface GetterFunc<R> {
public R get(String hash, String key, Type type);
public R get(String id, Type cacheType);
}
protected static interface SetterSyncFunc {
public void set(String hash, String key, Type cacheType, CacheValue cacheVal);
public void set(String id, Duration expire, Type cacheType, CacheValue cacheVal);
}
protected static interface SetterAsyncFunc {
public CompletableFuture<Void> set(String hash, String key, Type cacheType, CacheValue cacheVal);
public CompletableFuture<Void> set(String id, Duration expire, Type cacheType, CacheValue cacheVal);
}
protected class CacheAsyncEntry {

View File

@@ -3,7 +3,6 @@
*/
package org.redkale.cache.spi;
import java.time.Duration;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
@@ -19,39 +18,39 @@ import org.redkale.convert.json.JsonConvert;
*
* @since 2.8.0
*/
public class CacheValue<T> extends CacheExpire {
public class CacheValue<T> {
@ConvertColumn(index = 2)
private T value;
@ConvertColumn(index = 1)
private T val;
public CacheValue() {
}
protected CacheValue(T value, Duration expire) {
this.value = value;
this.time = expire == null ? 0 : (System.currentTimeMillis() + expire.toMillis());
protected CacheValue(T value) {
this.val = value;
}
public static <T> CacheValue<T> create(T value, Duration expire) {
return new CacheValue(value, expire);
public static <T> CacheValue<T> create(T value) {
return new CacheValue(value);
}
public static boolean isValid(CacheValue val) {
return val != null && !val.isExpired();
return val != null;
}
public static <T> T get(CacheValue val) {
return isValid(val) ? (T) val.getValue() : null;
return isValid(val) ? (T) val.getVal() : null;
}
public T getValue() {
return value;
public T getVal() {
return val;
}
public void setValue(T value) {
this.value = value;
public void setVal(T val) {
this.val = val;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}

View File

@@ -2341,6 +2341,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
//<=0表示永久保存
private long expireMills;
private long initTime;
private final ReentrantLock lock = new ReentrantLock();
public CacheEntry(CacheEntryType cacheType, String key) {
@@ -2358,6 +2360,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
public CacheEntry milliSeconds(long milliSeconds) {
this.initTime = System.currentTimeMillis();
this.expireMills = milliSeconds > 0 ? milliSeconds : 0;
return this;
}
@@ -2369,7 +2372,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@ConvertColumn(ignore = true)
public boolean isExpired() {
return expireMills > 0 && (lastAccessed + expireMills) < System.currentTimeMillis();
return expireMills > 0 && (initTime + expireMills) < System.currentTimeMillis();
}
//value类型只能是byte[]/String/AtomicLong