From 83dd4da4ec31c6b73f9d5659a18525801ab11362 Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 27 Dec 2023 14:40:05 +0800 Subject: [PATCH] =?UTF-8?q?CacheManager=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/cache/spi/CacheExpire.java | 44 ---- .../cache/spi/CacheManagerService.java | 222 +++++++++++------- .../org/redkale/cache/spi/CacheValue.java | 29 ++- .../org/redkale/source/CacheMemorySource.java | 5 +- 4 files changed, 151 insertions(+), 149 deletions(-) delete mode 100644 src/main/java/org/redkale/cache/spi/CacheExpire.java diff --git a/src/main/java/org/redkale/cache/spi/CacheExpire.java b/src/main/java/org/redkale/cache/spi/CacheExpire.java deleted file mode 100644 index 1626fe24b..000000000 --- a/src/main/java/org/redkale/cache/spi/CacheExpire.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - */ -package org.redkale.cache.spi; - -import org.redkale.convert.ConvertColumn; -import org.redkale.convert.ConvertDisabled; -import org.redkale.convert.json.JsonConvert; - -/** - * - * 缓存过期对象 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.8.0 - */ -public class CacheExpire { - - //为0表示不过期 - @ConvertColumn(index = 1) - protected long time; - - @ConvertDisabled - public boolean isExpired() { - return time > 0 && System.currentTimeMillis() > time; - } - - public long getTime() { - return time; - } - - public void setTime(long time) { - this.time = time; - } - - @Override - public String toString() { - return JsonConvert.root().convertTo(this); - } -} diff --git a/src/main/java/org/redkale/cache/spi/CacheManagerService.java b/src/main/java/org/redkale/cache/spi/CacheManagerService.java index b538bae5a..f610c5d5c 100644 --- a/src/main/java/org/redkale/cache/spi/CacheManagerService.java +++ b/src/main/java/org/redkale/cache/spi/CacheManagerService.java @@ -138,9 +138,7 @@ public class CacheManagerService implements CacheManager, Service { @Override public T localGet(final String hash, final String key, final Type type) { checkEnable(); - Type cacheType = loadCacheType(type); - CacheValue cacheVal = localSource.hget(hash, key, cacheType); - return CacheValue.get(cacheVal); + return CacheValue.get(localSource.get(idFor(hash, key), loadCacheType(type))); } /** @@ -158,7 +156,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public T localGetSet(final String hash, final String key, final Type type, boolean nullable, Duration expire, ThrowSupplier supplier) { - return getSet(localSource::hget, localSource::hset, hash, key, type, nullable, expire, supplier); + return getSet(localSource::get, this::localSetCache, hash, key, type, nullable, expire, supplier); } /** @@ -176,7 +174,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public CompletableFuture localGetSetAsync(String hash, String key, Type type, boolean nullable, Duration expire, ThrowSupplier> supplier) { - return getSetAsync(localSource::hgetAsync, localSource::hsetAsync, hash, key, type, nullable, expire, supplier); + return getSetAsync(localSource::getAsync, this::localSetCacheAsync, hash, key, type, nullable, expire, supplier); } /** @@ -191,11 +189,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public void localSet(String hash, String key, Type type, T value, Duration expire) { - checkEnable(); - Objects.requireNonNull(expire); - Type cacheType = loadCacheType(type, value); - CacheValue cacheVal = CacheValue.create(value, expire); - localSource.hset(hash, key, cacheType, cacheVal); + setCache(localSource, hash, key, type, value, expire); } /** @@ -209,7 +203,7 @@ public class CacheManagerService implements CacheManager, Service { @Override public long localDel(String hash, String key) { checkEnable(); - return localSource.hdel(hash, key); + return localSource.del(idFor(hash, key)); } //-------------------------------------- 远程缓存 -------------------------------------- @@ -226,9 +220,7 @@ public class CacheManagerService implements CacheManager, Service { @Override public T remoteGet(final String hash, final String key, final Type type) { checkEnable(); - Type cacheType = loadCacheType(type); - CacheValue cacheVal = remoteSource.hget(hash, key, cacheType); - return CacheValue.get(cacheVal); + return CacheValue.get(remoteSource.get(idFor(hash, key), loadCacheType(type))); } /** @@ -244,8 +236,7 @@ public class CacheManagerService implements CacheManager, Service { @Override public CompletableFuture remoteGetAsync(final String hash, final String key, final Type type) { checkEnable(); - Type cacheType = loadCacheType(type); - CompletableFuture> future = remoteSource.hgetAsync(hash, key, cacheType); + CompletableFuture> future = remoteSource.getAsync(idFor(hash, key), loadCacheType(type)); return future.thenApply(CacheValue::get); } @@ -264,7 +255,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public T remoteGetSet(final String hash, final String key, final Type type, boolean nullable, Duration expire, ThrowSupplier supplier) { - return getSet(remoteSource::hget, remoteSource::hset, hash, key, type, nullable, expire, supplier); + return getSet(remoteSource::get, this::remoteSetCache, hash, key, type, nullable, expire, supplier); } /** @@ -282,7 +273,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public CompletableFuture remoteGetSetAsync(String hash, String key, Type type, boolean nullable, Duration expire, ThrowSupplier> supplier) { - return getSetAsync(remoteSource::hgetAsync, remoteSource::hsetAsync, hash, key, type, nullable, expire, supplier); + return getSetAsync(remoteSource::getAsync, this::remoteSetCacheAsync, hash, key, type, nullable, expire, supplier); } /** @@ -297,11 +288,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public void remoteSet(final String hash, final String key, final Type type, final T value, Duration expire) { - checkEnable(); - Objects.requireNonNull(expire); - Type cacheType = loadCacheType(type, value); - CacheValue cacheVal = CacheValue.create(value, expire); - remoteSource.hset(hash, key, cacheType, cacheVal); + setCache(remoteSource, hash, key, type, value, expire); } /** @@ -316,11 +303,7 @@ public class CacheManagerService implements CacheManager, Service { */ @Override public CompletableFuture remoteSetAsync(String hash, String key, Type type, T value, Duration expire) { - checkEnable(); - Objects.requireNonNull(expire); - Type cacheType = loadCacheType(type, value); - CacheValue cacheVal = CacheValue.create(value, expire); - return remoteSource.hsetAsync(hash, key, cacheType, cacheVal); + return setCacheAsync(remoteSource, hash, key, type, value, expire); } /** @@ -334,7 +317,7 @@ public class CacheManagerService implements CacheManager, Service { @Override public long remoteDel(String hash, String key) { checkEnable(); - return remoteSource.hdel(hash, key); + return remoteSource.del(idFor(hash, key)); } /** @@ -348,7 +331,7 @@ public class CacheManagerService implements CacheManager, Service { @Override public CompletableFuture remoteDelAsync(String hash, String key) { checkEnable(); - return remoteSource.hdelAsync(hash, key); + return remoteSource.delAsync(idFor(hash, key)); } //-------------------------------------- both缓存 -------------------------------------- @@ -409,17 +392,18 @@ public class CacheManagerService implements CacheManager, Service { throw new RedkaleException(t); } } + if (remoteExpire == null) { //只有本地缓存 + Objects.requireNonNull(localExpire); + return localGetSet(hash, key, type, nullable, localExpire, supplier); + } if (localExpire == null) { //只有远程缓存 Objects.requireNonNull(remoteExpire); return remoteGetSet(hash, key, type, nullable, remoteExpire, supplier); } - if (remoteExpire == null) { //只有本地缓存 - return localGetSet(hash, key, type, nullable, localExpire, supplier); - } - return getSet(this::bothGetCache, (h, k, t, v) -> { - localSource.hset(h, k, t, v); + return getSet(this::bothGetCache, (i, e, t, v) -> { + localSetCache(i, localExpire, t, v); if (remoteSource != null) { - remoteSource.hset(h, k, t, CacheValue.create(v.getValue(), remoteExpire)); + remoteSetCache(i, remoteExpire, t, v); } }, hash, key, type, nullable, localExpire, supplier); } @@ -448,17 +432,18 @@ public class CacheManagerService implements CacheManager, Service { return CompletableFuture.failedFuture(t); } } + if (remoteExpire == null) { //只有本地缓存 + Objects.requireNonNull(localExpire); + return localGetSetAsync(hash, key, type, nullable, localExpire, supplier); + } if (localExpire == null) { //只有远程缓存 Objects.requireNonNull(remoteExpire); return remoteGetSetAsync(hash, key, type, nullable, remoteExpire, supplier); } - if (remoteExpire == null) { //只有本地缓存 - return localGetSetAsync(hash, key, type, nullable, localExpire, supplier); - } - return getSetAsync(this::bothGetCacheAsync, (h, k, t, v) -> { - localSource.hset(h, k, t, v); + return getSetAsync(this::bothGetCacheAsync, (i, e, t, v) -> { + localSetCache(i, localExpire, t, v); if (remoteSource != null) { - return remoteSource.hsetAsync(h, k, t, CacheValue.create(v.getValue(), remoteExpire)); + return remoteSetCacheAsync(i, remoteExpire, t, v); } else { return CompletableFuture.completedFuture(null); } @@ -479,12 +464,11 @@ public class CacheManagerService implements CacheManager, Service { @Override public void bothSet(final String hash, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) { checkEnable(); - Type cacheType = loadCacheType(type, value); if (localExpire != null) { - localSource.hset(hash, key, cacheType, CacheValue.create(value, localExpire)); + setCache(localSource, hash, key, type, value, localExpire); } if (remoteSource != null && remoteExpire != null) { - remoteSource.hset(hash, key, cacheType, CacheValue.create(value, remoteExpire)); + setCache(remoteSource, hash, key, type, value, remoteExpire); } } @@ -504,12 +488,11 @@ public class CacheManagerService implements CacheManager, Service { @Override public CompletableFuture bothSetAsync(String hash, String key, Type type, T value, Duration localExpire, Duration remoteExpire) { checkEnable(); - Type cacheType = loadCacheType(type, value); if (localExpire != null) { - localSource.hset(hash, key, cacheType, CacheValue.create(value, localExpire)); //内存操作,无需异步 + setCache(localSource, hash, key, type, value, localExpire); } if (remoteSource != null && remoteExpire != null) { - return remoteSource.hsetAsync(hash, key, cacheType, CacheValue.create(value, remoteExpire)); + return setCacheAsync(remoteSource, hash, key, type, value, remoteExpire); } else { return CompletableFuture.completedFuture(null); } @@ -526,9 +509,10 @@ public class CacheManagerService implements CacheManager, Service { @Override public long bothDel(String hash, String key) { checkEnable(); - long v = localSource.hdel(hash, key); + String id = idFor(hash, key); + long v = localSource.del(id); if (remoteSource != null) { - return remoteSource.hdel(hash, key); + return remoteSource.del(id); } else { return v; } @@ -545,9 +529,10 @@ public class CacheManagerService implements CacheManager, Service { @Override public CompletableFuture bothDelAsync(String hash, String key) { checkEnable(); - long v = localSource.hdel(hash, key); //内存操作,无需异步 + String id = idFor(hash, key); + long v = localSource.del(id); //内存操作,无需异步 if (remoteSource != null) { - return remoteSource.hdelAsync(hash, key); + return remoteSource.delAsync(id); } else { return CompletableFuture.completedFuture(v); } @@ -575,34 +560,34 @@ public class CacheManagerService implements CacheManager, Service { Objects.requireNonNull(expire); Objects.requireNonNull(supplier); final Type cacheType = loadCacheType(type); - CacheValue cacheVal = getter.get(hash, key, type); + final String id = idFor(hash, key); + CacheValue cacheVal = getter.get(id, cacheType); if (CacheValue.isValid(cacheVal)) { - return cacheVal.getValue(); + return cacheVal.getVal(); } Function func = k -> { - CacheValue oldCacheVal = getter.get(hash, key, type); + CacheValue oldCacheVal = getter.get(id, cacheType); if (CacheValue.isValid(oldCacheVal)) { return oldCacheVal; } CacheValue newCacheVal; try { - newCacheVal = toCacheSupplier(nullable, expire, supplier).get(); + newCacheVal = toCacheSupplier(nullable, supplier).get(); } catch (RuntimeException e) { throw e; } catch (Throwable t) { throw new RedkaleException(t); } if (CacheValue.isValid(newCacheVal)) { - setter.set(hash, key, cacheType, newCacheVal); + setter.set(id, expire, cacheType, newCacheVal); } return newCacheVal; }; - final String lockId = lockId(hash, key); - cacheVal = syncLock.computeIfAbsent(lockId, func); + cacheVal = syncLock.computeIfAbsent(id, func); try { return CacheValue.get(cacheVal); } finally { - syncLock.remove(lockId); + syncLock.remove(id); } } @@ -626,13 +611,13 @@ public class CacheManagerService implements CacheManager, Service { checkEnable(); Objects.requireNonNull(supplier); final Type cacheType = loadCacheType(type); - CompletableFuture> sourceFuture = getter.get(hash, key, type); + final String id = idFor(hash, key); + CompletableFuture> sourceFuture = getter.get(id, cacheType); return sourceFuture.thenCompose(val -> { if (CacheValue.isValid(val)) { - return CompletableFuture.completedFuture(val.getValue()); + return CompletableFuture.completedFuture(val.getVal()); } - final String lockId = lockId(hash, key); - final CacheAsyncEntry entry = asyncLock.computeIfAbsent(lockId, CacheAsyncEntry::new); + final CacheAsyncEntry entry = asyncLock.computeIfAbsent(id, CacheAsyncEntry::new); CompletableFuture future = new CompletableFuture<>(); if (entry.compareAddFuture(future)) { try { @@ -640,9 +625,9 @@ public class CacheManagerService implements CacheManager, Service { if (e != null) { entry.fail(e); } - CacheValue cacheVal = toCacheValue(nullable, expire, v); + CacheValue cacheVal = toCacheValue(nullable, v); if (CacheValue.isValid(cacheVal)) { - setter.set(hash, key, cacheType, cacheVal) + setter.set(id, expire, cacheType, cacheVal) .whenComplete((v2, e2) -> entry.success(CacheValue.get(cacheVal))); } else { entry.success(CacheValue.get(cacheVal)); @@ -656,20 +641,56 @@ public class CacheManagerService implements CacheManager, Service { }); } - protected CacheValue bothGetCache(final String hash, final String key, final Type type) { + protected CompletableFuture localSetCacheAsync(String id, Duration expire, Type cacheType, CacheValue cacheVal) { + return setCacheAsync(localSource, id, expire, cacheType, cacheVal); + } + + protected CompletableFuture remoteSetCacheAsync(String id, Duration expire, Type cacheType, CacheValue cacheVal) { + return setCacheAsync(remoteSource, id, expire, cacheType, cacheVal); + } + + protected void localSetCache(String id, Duration expire, Type cacheType, CacheValue cacheVal) { + setCache(localSource, id, expire, cacheType, cacheVal); + } + + protected void remoteSetCache(String id, Duration expire, Type cacheType, CacheValue cacheVal) { + setCache(remoteSource, id, expire, cacheType, cacheVal); + } + + protected void setCache(CacheSource source, String id, Duration expire, Type cacheType, CacheValue cacheVal) { checkEnable(); - Type cacheType = loadCacheType(type); - CacheValue cacheVal = localSource.hget(hash, key, cacheType); - if (CacheValue.isValid(cacheVal)) { - return cacheVal; - } - if (remoteSource != null) { - return remoteSource.hget(hash, key, cacheType); + Objects.requireNonNull(expire); + long millis = expire.toMillis(); + if (millis > 0) { + source.psetex(id, millis, cacheType, cacheVal); } else { - return null; + source.set(id, cacheType, cacheVal); } } + protected CompletableFuture setCacheAsync(CacheSource source, String id, Duration expire, Type cacheType, CacheValue cacheVal) { + checkEnable(); + Objects.requireNonNull(expire); + long millis = expire.toMillis(); + if (millis > 0) { + return source.psetexAsync(id, millis, cacheType, cacheVal); + } else { + return source.setAsync(id, cacheType, cacheVal); + } + } + + protected void setCache(CacheSource source, String hash, String key, Type type, T value, Duration expire) { + setCache(source, idFor(hash, key), expire, loadCacheType(type, value), CacheValue.create(value)); + } + + protected CompletableFuture setCacheAsync(CacheSource source, String hash, String key, Type type, T value, Duration expire) { + return setCacheAsync(source, idFor(hash, key), expire, loadCacheType(type, value), CacheValue.create(value)); + } + + protected CacheValue bothGetCache(final String hash, final String key, final Type type) { + return bothGetCache(idFor(hash, key), loadCacheType(type)); + } + /** * 远程异步获取缓存数据, 过期返回null * @@ -681,14 +702,39 @@ public class CacheManagerService implements CacheManager, Service { * @return 数据值 */ protected CompletableFuture> bothGetCacheAsync(final String hash, final String key, final Type type) { + return bothGetCacheAsync(idFor(hash, key), loadCacheType(type)); + } + + protected CacheValue bothGetCache(final String id, final Type cacheType) { checkEnable(); - Type cacheType = loadCacheType(type); - CacheValue val = localSource.hget(hash, key, cacheType); //内存操作,无需异步 + CacheValue cacheVal = localSource.get(id, cacheType); + if (CacheValue.isValid(cacheVal)) { + return cacheVal; + } + if (remoteSource != null) { + return remoteSource.get(id, cacheType); + } else { + return null; + } + } + + /** + * 远程异步获取缓存数据, 过期返回null + * + * @param 泛型 + * @param id 缓存键 + * @param cacheType 数据类型 + * + * @return 数据值 + */ + protected CompletableFuture> bothGetCacheAsync(final String id, final Type cacheType) { + checkEnable(); + CacheValue val = localSource.get(id, cacheType); //内存操作,无需异步 if (CacheValue.isValid(val)) { return CompletableFuture.completedFuture(val); } if (remoteSource != null) { - return remoteSource.hgetAsync(hash, key, cacheType); + return remoteSource.getAsync(id, cacheType); } else { return CompletableFuture.completedFuture(null); } @@ -708,7 +754,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return key */ - protected String lockId(String hash, String key) { + protected String idFor(String hash, String key) { return hash + ':' + key; } @@ -717,16 +763,15 @@ public class CacheManagerService implements CacheManager, Service { * * @param 泛型 * @param nullable 是否缓存null值 - * @param expire 过期时长,Duration.ZERO为永不过期 * @param value 缓存值 * * @return CacheValue函数 */ - protected CacheValue toCacheValue(boolean nullable, Duration expire, T value) { + protected CacheValue toCacheValue(boolean nullable, T value) { if (value == null) { - return nullable ? CacheValue.create(value, expire) : null; + return nullable ? CacheValue.create(value) : null; } - return CacheValue.create(value, expire); + return CacheValue.create(value); } /** @@ -734,13 +779,12 @@ public class CacheManagerService implements CacheManager, Service { * * @param 泛型 * @param nullable 是否缓存null值 - * @param expire 过期时长,Duration.ZERO为永不过期 * @param supplier 数据函数 * * @return CacheValue函数 */ - protected ThrowSupplier> toCacheSupplier(boolean nullable, Duration expire, ThrowSupplier supplier) { - return () -> toCacheValue(nullable, expire, supplier.get()); + protected ThrowSupplier> toCacheSupplier(boolean nullable, ThrowSupplier supplier) { + return () -> toCacheValue(nullable, supplier.get()); } /** @@ -770,17 +814,17 @@ public class CacheManagerService implements CacheManager, Service { protected static interface GetterFunc { - public R get(String hash, String key, Type type); + public R get(String id, Type cacheType); } protected static interface SetterSyncFunc { - public void set(String hash, String key, Type cacheType, CacheValue cacheVal); + public void set(String id, Duration expire, Type cacheType, CacheValue cacheVal); } protected static interface SetterAsyncFunc { - public CompletableFuture set(String hash, String key, Type cacheType, CacheValue cacheVal); + public CompletableFuture set(String id, Duration expire, Type cacheType, CacheValue cacheVal); } protected class CacheAsyncEntry { diff --git a/src/main/java/org/redkale/cache/spi/CacheValue.java b/src/main/java/org/redkale/cache/spi/CacheValue.java index 04cf6044c..82bb10c87 100644 --- a/src/main/java/org/redkale/cache/spi/CacheValue.java +++ b/src/main/java/org/redkale/cache/spi/CacheValue.java @@ -3,7 +3,6 @@ */ package org.redkale.cache.spi; -import java.time.Duration; import org.redkale.convert.ConvertColumn; import org.redkale.convert.json.JsonConvert; @@ -19,39 +18,39 @@ import org.redkale.convert.json.JsonConvert; * * @since 2.8.0 */ -public class CacheValue extends CacheExpire { +public class CacheValue { - @ConvertColumn(index = 2) - private T value; + @ConvertColumn(index = 1) + private T val; public CacheValue() { } - protected CacheValue(T value, Duration expire) { - this.value = value; - this.time = expire == null ? 0 : (System.currentTimeMillis() + expire.toMillis()); + protected CacheValue(T value) { + this.val = value; } - public static CacheValue create(T value, Duration expire) { - return new CacheValue(value, expire); + public static CacheValue create(T value) { + return new CacheValue(value); } public static boolean isValid(CacheValue val) { - return val != null && !val.isExpired(); + return val != null; } public static T get(CacheValue val) { - return isValid(val) ? (T) val.getValue() : null; + return isValid(val) ? (T) val.getVal() : null; } - public T getValue() { - return value; + public T getVal() { + return val; } - public void setValue(T value) { - this.value = value; + public void setVal(T val) { + this.val = val; } + @Override public String toString() { return JsonConvert.root().convertTo(this); } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index c27cb3284..031f1f92f 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -2341,6 +2341,8 @@ public final class CacheMemorySource extends AbstractCacheSource { //<=0表示永久保存 private long expireMills; + private long initTime; + private final ReentrantLock lock = new ReentrantLock(); public CacheEntry(CacheEntryType cacheType, String key) { @@ -2358,6 +2360,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } public CacheEntry milliSeconds(long milliSeconds) { + this.initTime = System.currentTimeMillis(); this.expireMills = milliSeconds > 0 ? milliSeconds : 0; return this; } @@ -2369,7 +2372,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @ConvertColumn(ignore = true) public boolean isExpired() { - return expireMills > 0 && (lastAccessed + expireMills) < System.currentTimeMillis(); + return expireMills > 0 && (initTime + expireMills) < System.currentTimeMillis(); } //value类型只能是byte[]/String/AtomicLong