增加Cached.localLimit属性

This commit is contained in:
redkale
2024-09-11 16:43:22 +08:00
parent ebf498edf8
commit 2a8bda69e5
10 changed files with 258 additions and 56 deletions

View File

@@ -12,6 +12,7 @@
|name|未定义|缓存的名称| |name|未定义|缓存的名称|
|key|未定义|缓存的key支持参数动态组合比如"key_#{id}"| |key|未定义|缓存的key支持参数动态组合比如"key_#{id}"|
|manager|空|缓存管理器名称, 不能含有':'、'#'、'@'字符| |manager|空|缓存管理器名称, 不能含有':'、'#'、'@'字符|
|localLimit|-1|本地缓存数量上限, 小于1表示无上限。 <br> 参数值支持方式:<br> &emsp;100: 设置数值 <br> &emsp;${env.cache.limit}: 读取系统配置项 |
|localExpire|-1|本地缓存过期时长, 0表示永不过期 -1表示不作本地缓存。 <br> 参数值支持方式:<br> &emsp;100: 设置数值 <br> &emsp;${env.cache.expires}: 读取系统配置项 | |localExpire|-1|本地缓存过期时长, 0表示永不过期 -1表示不作本地缓存。 <br> 参数值支持方式:<br> &emsp;100: 设置数值 <br> &emsp;${env.cache.expires}: 读取系统配置项 |
|remoteExpire|-1|远程缓存过期时长, 0表示永不过期 -1表示不作远程缓存。 <br> 参数值支持方式:<br> &emsp;100: 设置数值 <br> &emsp;${env.cache.expires}: 读取系统配置项 | |remoteExpire|-1|远程缓存过期时长, 0表示永不过期 -1表示不作远程缓存。 <br> 参数值支持方式:<br> &emsp;100: 设置数值 <br> &emsp;${env.cache.expires}: 读取系统配置项 |
|nullable|false|是否可以缓存null值| |nullable|false|是否可以缓存null值|

View File

@@ -51,6 +51,15 @@ public @interface Cached {
*/ */
String manager() default ""; String manager() default "";
/**
* 本地缓存数量上限, 小于1表示无上限<br>
* 参数值支持方式:<br>
* 100: 设置数值 ${env.cache.limit}: 读取系统配置项
*
* @return 数量上限
*/
String localLimit() default "-1";
/** /**
* 本地缓存过期时长, 0表示永不过期 -1表示不作本地缓存。<br> * 本地缓存过期时长, 0表示永不过期 -1表示不作本地缓存。<br>
* 参数值支持方式:<br> * 参数值支持方式:<br>

View File

@@ -123,12 +123,19 @@ public interface CachedManager extends Resourcable {
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param expire 过期时长Duration.ZERO为永不过期 * @param expire 过期时长Duration.ZERO为永不过期
* @param supplier 数据函数 * @param supplier 数据函数
* @return 数据值 * @return 数据值
*/ */
public <T> T localGetSet( public <T> T localGetSet(
String name, String key, Type type, boolean nullable, Duration expire, ThrowSupplier<T> supplier); String name,
String key,
Type type,
boolean nullable,
int localLimit,
Duration expire,
ThrowSupplier<T> supplier);
/** /**
* 本地异步获取缓存数据, 过期返回null * 本地异步获取缓存数据, 过期返回null
@@ -138,6 +145,7 @@ public interface CachedManager extends Resourcable {
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param expire 过期时长Duration.ZERO为永不过期 * @param expire 过期时长Duration.ZERO为永不过期
* @param supplier 数据函数 * @param supplier 数据函数
* @return 数据值 * @return 数据值
@@ -147,6 +155,7 @@ public interface CachedManager extends Resourcable {
String key, String key,
Type type, Type type,
boolean nullable, boolean nullable,
int localLimit,
Duration expire, Duration expire,
ThrowSupplier<CompletableFuture<T>> supplier); ThrowSupplier<CompletableFuture<T>> supplier);
@@ -384,6 +393,7 @@ public interface CachedManager extends Resourcable {
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param localExpire 本地过期时长Duration.ZERO为永不过期为null表示不本地缓存 * @param localExpire 本地过期时长Duration.ZERO为永不过期为null表示不本地缓存
* @param remoteExpire 远程过期时长Duration.ZERO为永不过期为null表示不远程缓存 * @param remoteExpire 远程过期时长Duration.ZERO为永不过期为null表示不远程缓存
* @param supplier 数据函数 * @param supplier 数据函数
@@ -394,6 +404,7 @@ public interface CachedManager extends Resourcable {
String key, String key,
Type type, Type type,
boolean nullable, boolean nullable,
int localLimit,
Duration localExpire, Duration localExpire,
Duration remoteExpire, Duration remoteExpire,
ThrowSupplier<T> supplier); ThrowSupplier<T> supplier);
@@ -406,6 +417,7 @@ public interface CachedManager extends Resourcable {
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param localExpire 本地过期时长Duration.ZERO为永不过期为null表示不本地缓存 * @param localExpire 本地过期时长Duration.ZERO为永不过期为null表示不本地缓存
* @param remoteExpire 远程过期时长Duration.ZERO为永不过期为null表示不远程缓存 * @param remoteExpire 远程过期时长Duration.ZERO为永不过期为null表示不远程缓存
* @param supplier 数据函数 * @param supplier 数据函数
@@ -416,6 +428,7 @@ public interface CachedManager extends Resourcable {
String key, String key,
Type type, Type type,
boolean nullable, boolean nullable,
int localLimit,
Duration localExpire, Duration localExpire,
Duration remoteExpire, Duration remoteExpire,
ThrowSupplier<CompletableFuture<T>> supplier); ThrowSupplier<CompletableFuture<T>> supplier);

View File

@@ -74,6 +74,9 @@ public class CachedAction {
// 父对象 // 父对象
private Object service; private Object service;
// 本地缓存数量上线,> 0才有效
private int localLimit;
// 本地缓存过期时长Duration.ZERO为永不过期为null表示不本地缓存 // 本地缓存过期时长Duration.ZERO为永不过期为null表示不本地缓存
private Duration localExpire; private Duration localExpire;
@@ -106,6 +109,7 @@ public class CachedAction {
MultiHashKey dynKey = MultiHashKey.create(paramNames, key); MultiHashKey dynKey = MultiHashKey.create(paramNames, key);
this.keyGenerator = CachedKeyGenerator.create(dynKey); this.keyGenerator = CachedKeyGenerator.create(dynKey);
} }
this.localLimit = Integer.parseInt(environment.getPropertyValue(cached.getLocalLimit()));
this.localExpire = createDuration(cached.getLocalExpire()); this.localExpire = createDuration(cached.getLocalExpire());
this.remoteExpire = createDuration(cached.getRemoteExpire()); this.remoteExpire = createDuration(cached.getRemoteExpire());
((CachedActionFunc) this.manager).addAction(this); ((CachedActionFunc) this.manager).addAction(this);
@@ -120,6 +124,7 @@ public class CachedAction {
keyGenerator.generate(service, this, args), keyGenerator.generate(service, this, args),
resultType, resultType,
nullable, nullable,
localLimit,
localExpire, localExpire,
remoteExpire, remoteExpire,
(ThrowSupplier) supplier); (ThrowSupplier) supplier);
@@ -129,6 +134,7 @@ public class CachedAction {
keyGenerator.generate(service, this, args), keyGenerator.generate(service, this, args),
resultType, resultType,
nullable, nullable,
localLimit,
localExpire, localExpire,
remoteExpire, remoteExpire,
supplier); supplier);

View File

@@ -21,8 +21,11 @@ public class CachedEntry {
private String manager; private String manager;
private String name; private String name;
private String key; private String key;
private String localLimit;
private String localExpire; private String localExpire;
private String remoteExpire; private String remoteExpire;
@@ -37,6 +40,7 @@ public class CachedEntry {
this.manager = cached.manager(); this.manager = cached.manager();
this.name = cached.name(); this.name = cached.name();
this.key = cached.key(); this.key = cached.key();
this.localLimit = cached.localLimit();
this.localExpire = cached.localExpire(); this.localExpire = cached.localExpire();
this.remoteExpire = cached.remoteExpire(); this.remoteExpire = cached.remoteExpire();
this.timeUnit = cached.timeUnit(); this.timeUnit = cached.timeUnit();
@@ -47,6 +51,7 @@ public class CachedEntry {
this.manager = cached.manager(); this.manager = cached.manager();
this.name = cached.name(); this.name = cached.name();
this.key = cached.key(); this.key = cached.key();
this.localLimit = cached.localLimit();
this.localExpire = cached.localExpire(); this.localExpire = cached.localExpire();
this.remoteExpire = cached.remoteExpire(); this.remoteExpire = cached.remoteExpire();
this.timeUnit = cached.timeUnit(); this.timeUnit = cached.timeUnit();
@@ -65,6 +70,10 @@ public class CachedEntry {
return key; return key;
} }
public String getLocalLimit() {
return localLimit;
}
public String getLocalExpire() { public String getLocalExpire() {
return localExpire; return localExpire;
} }

View File

@@ -6,12 +6,15 @@ package org.redkale.cached.spi;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.redkale.annotation.AutoLoad; import org.redkale.annotation.AutoLoad;
@@ -38,8 +41,8 @@ public class CachedLocalSource implements Service {
private final JsonConvert convert = JsonFactory.create().skipAllIgnore(true).getConvert(); private final JsonConvert convert = JsonFactory.create().skipAllIgnore(true).getConvert();
// key: name, sub-key: key // key: name
private final ConcurrentHashMap<String, Map<String, CacheItem>> container = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, CacheMap> container = new ConcurrentHashMap<>();
private ScheduledThreadPoolExecutor scheduler; private ScheduledThreadPoolExecutor scheduler;
@@ -83,24 +86,24 @@ public class CachedLocalSource implements Service {
} }
} }
public <T> void set(String name, String key, long millis, Type type, T value) { public <T> void set(String name, String key, int localLimit, long millis, Type type, T value) {
// millis > 0 才需要过期设置 // millis > 0 才需要过期设置
String json = convert.convertTo(type, value); String json = convert.convertTo(type, value);
container container
.computeIfAbsent(name, n -> new ConcurrentHashMap<>()) .computeIfAbsent(name, n -> new CacheMap(localLimit))
.computeIfAbsent(key, k -> new CacheItem(json)) .computeIfAbsent(key, json)
.set(json, millis); .set(json, millis);
} }
public <T> T get(String name, String key, Type type) { public <T> T get(String name, String key, Type type) {
Map<String, CacheItem> map = container.get(name); CacheMap map = container.get(name);
CacheItem item = map == null ? null : map.get(key); CacheItem item = map == null ? null : map.get(key);
String json = item == null || item.isExpired() ? null : item.getValue(); String json = item == null || item.isExpired() ? null : item.getValue();
return json == null ? null : convert.convertFrom(type, json); return json == null ? null : convert.convertFrom(type, json);
} }
public long del(String name, String key) { public long del(String name, String key) {
Map<String, CacheItem> map = container.get(name); CacheMap map = container.get(name);
return map != null && map.remove(key) != null ? 1 : 0; return map != null && map.remove(key) != null ? 1 : 0;
} }
@@ -112,11 +115,83 @@ public class CachedLocalSource implements Service {
return CompletableFuture.completedFuture(del(name, key)); return CompletableFuture.completedFuture(del(name, key));
} }
public <T> CompletableFuture<Void> setAsync(String name, String key, long millis, Type type, T value) { public <T> CompletableFuture<Void> setAsync(
return CompletableFuture.runAsync(() -> set(name, key, millis, type, value)); String name, String key, int localLimit, long millis, Type type, T value) {
return CompletableFuture.runAsync(() -> set(name, key, localLimit, millis, type, value));
} }
protected static class CacheItem { public int getKeyCount(String name) {
CacheMap map = container.get(name);
return map == null ? -1 : map.size();
}
protected static class CacheMap {
protected final ReentrantLock lock = new ReentrantLock();
protected final ConcurrentHashMap<String, CacheItem> map = new ConcurrentHashMap<>();
protected int limit;
public CacheMap(int limit) {
this.limit = limit;
}
public void forEach(BiConsumer<String, CacheItem> action) {
map.forEach(action);
}
public CacheItem remove(String key) {
return map.remove(key);
}
public CacheItem get(String key) {
return map.get(key);
}
public CacheItem computeIfAbsent(String key, String json) {
if (limit > 0) {
AtomicBoolean added = new AtomicBoolean();
CacheItem item = map.computeIfAbsent(key, k -> {
added.set(true);
return new CacheItem(key, json);
});
if (added.get()) {
checkLimit();
}
return item;
} else {
return map.computeIfAbsent(key, k -> new CacheItem(key, json));
}
}
public int size() {
return map.size();
}
void checkLimit() {
int l = limit;
if (l > 0 && map.size() > l) {
lock.lock();
try {
if (l > 0 && map.size() > l) {
List<CacheItem> items = new ArrayList<>(map.values());
Collections.sort(items);
int count = map.size() - l;
for (int i = 0; i < count; i++) {
map.remove(items.get(i).getKey());
}
}
} finally {
lock.unlock();
}
}
}
}
protected static class CacheItem implements Comparable<CacheItem> {
private final String key;
@Nullable // json格式 @Nullable // json格式
protected String value; protected String value;
@@ -124,17 +199,29 @@ public class CachedLocalSource implements Service {
// 为0表示永久 大于0表示有过期时间 // 为0表示永久 大于0表示有过期时间
private long endMillis; private long endMillis;
public CacheItem(String value) { private long createTime = System.currentTimeMillis();
public CacheItem(String key, String value) {
this.key = key;
this.value = value; this.value = value;
} }
public String getKey() {
return key;
}
public String getValue() { public String getValue() {
return value; return value;
} }
public long getCreateTime() {
return createTime;
}
public void set(String value, long millis) { public void set(String value, long millis) {
this.value = value; this.value = value;
this.endMillis = millis > 0 ? (System.currentTimeMillis() + millis) : 0; this.createTime = System.currentTimeMillis();
this.endMillis = millis > 0 ? (this.createTime + millis) : 0;
} }
@ConvertDisabled @ConvertDisabled
@@ -145,5 +232,17 @@ public class CachedLocalSource implements Service {
boolean isExpired(long now) { boolean isExpired(long now) {
return endMillis > 0 && now >= endMillis; return endMillis > 0 && now >= endMillis;
} }
@Override
public int compareTo(CacheItem o) {
long t1 = this.createTime;
long t2 = o == null ? 0 : o.createTime;
return t1 == t2 ? 0 : (t1 > t2 ? 1 : -1);
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
} }
} }

View File

@@ -271,20 +271,28 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param expire 过期时长Duration.ZERO为永不过期 * @param expire 过期时长Duration.ZERO为永不过期
* @param supplier 数据函数 * @param supplier 数据函数
* @return 数据值 * @return 数据值
*/ */
@Override @Override
public <T> T localGetSet( public <T> T localGetSet(
String name, String key, Type type, boolean nullable, Duration expire, ThrowSupplier<T> supplier) { String name,
String key,
Type type,
boolean nullable,
int localLimit,
Duration expire,
ThrowSupplier<T> supplier) {
return getSet( return getSet(
(n, k, ex, ct) -> localSource.get(name, idFor(n, k), ct), (n, k, l, ex, ct) -> localSource.get(name, idFor(n, k), ct),
this::localSetCache, this::localSetCache,
name, name,
key, key,
type, type,
nullable, nullable,
localLimit,
expire, expire,
supplier); supplier);
} }
@@ -297,6 +305,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param expire 过期时长Duration.ZERO为永不过期 * @param expire 过期时长Duration.ZERO为永不过期
* @param supplier 数据函数 * @param supplier 数据函数
* @return 数据值 * @return 数据值
@@ -307,15 +316,17 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
String key, String key,
Type type, Type type,
boolean nullable, boolean nullable,
int localLimit,
Duration expire, Duration expire,
ThrowSupplier<CompletableFuture<T>> supplier) { ThrowSupplier<CompletableFuture<T>> supplier) {
return getSetAsync( return getSetAsync(
(n, k, e, c) -> localSource.getAsync(name, idFor(n, k), c), (n, k, l, e, c) -> localSource.getAsync(name, idFor(n, k), c),
this::localSetCacheAsync, this::localSetCacheAsync,
name, name,
key, key,
type, type,
nullable, nullable,
localLimit,
expire, expire,
supplier); supplier);
} }
@@ -332,7 +343,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
*/ */
@Override @Override
public <T> void localSet(String name, String key, Type type, T value, Duration expire) { public <T> void localSet(String name, String key, Type type, T value, Duration expire) {
localSetCache(name, key, type, value, expire); localSetCache(name, key, 0, type, value, expire);
} }
/** /**
@@ -396,12 +407,13 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
public <T> T remoteGetSet( public <T> T remoteGetSet(
String name, String key, Type type, boolean nullable, Duration expire, ThrowSupplier<T> supplier) { String name, String key, Type type, boolean nullable, Duration expire, ThrowSupplier<T> supplier) {
return getSet( return getSet(
(n, k, ex, ct) -> remoteSource.get(idFor(n, k), ct), (n, k, l, ex, ct) -> remoteSource.get(idFor(n, k), ct),
this::remoteSetCache, this::remoteSetCache,
name, name,
key, key,
type, type,
nullable, nullable,
0,
expire, expire,
supplier); supplier);
} }
@@ -427,12 +439,13 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
Duration expire, Duration expire,
ThrowSupplier<CompletableFuture<T>> supplier) { ThrowSupplier<CompletableFuture<T>> supplier) {
return getSetAsync( return getSetAsync(
(n, k, ex, ct) -> remoteSource.getAsync(idFor(n, k), ct), (n, k, l, ex, ct) -> remoteSource.getAsync(idFor(n, k), ct),
this::remoteSetCacheAsync, this::remoteSetCacheAsync,
name, name,
key, key,
type, type,
nullable, nullable,
0,
expire, expire,
supplier); supplier);
} }
@@ -464,7 +477,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
*/ */
@Override @Override
public <T> CompletableFuture<Void> remoteSetAsync(String name, String key, Type type, T value, Duration expire) { public <T> CompletableFuture<Void> remoteSetAsync(String name, String key, Type type, T value, Duration expire) {
return remoteSetCacheAsync(name, key, type, value, expire); return remoteSetCacheAsync(name, key, 0, type, value, expire);
} }
/** /**
@@ -505,7 +518,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
*/ */
@Override @Override
public <T> T bothGet(String name, String key, Type type) { public <T> T bothGet(String name, String key, Type type) {
return CachedValue.get(bothGetCache(name, key, (Duration) null, type)); return CachedValue.get(bothGetCache(name, key, 0, (Duration) null, type));
} }
/** /**
@@ -519,7 +532,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
*/ */
@Override @Override
public <T> CompletableFuture<T> bothGetAsync(String name, String key, Type type) { public <T> CompletableFuture<T> bothGetAsync(String name, String key, Type type) {
return bothGetCacheAsync(name, key, (Duration) null, type).thenApply(CachedValue::get); return bothGetCacheAsync(name, key, 0, (Duration) null, type).thenApply(CachedValue::get);
} }
/** /**
@@ -530,6 +543,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param localExpire 本地过期时长Duration.ZERO为永不过期为null表示不本地缓存 * @param localExpire 本地过期时长Duration.ZERO为永不过期为null表示不本地缓存
* @param remoteExpire 远程过期时长Duration.ZERO为永不过期为null表示不远程缓存 * @param remoteExpire 远程过期时长Duration.ZERO为永不过期为null表示不远程缓存
* @param supplier 数据函数 * @param supplier 数据函数
@@ -541,6 +555,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
String key, String key,
Type type, Type type,
boolean nullable, boolean nullable,
int localLimit,
Duration localExpire, Duration localExpire,
Duration remoteExpire, Duration remoteExpire,
ThrowSupplier<T> supplier) { ThrowSupplier<T> supplier) {
@@ -555,7 +570,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
} }
if (remoteExpire == null) { // 只有本地缓存 if (remoteExpire == null) { // 只有本地缓存
Objects.requireNonNull(localExpire); Objects.requireNonNull(localExpire);
return localGetSet(name, key, type, nullable, localExpire, supplier); return localGetSet(name, key, type, nullable, localLimit, localExpire, supplier);
} }
if (localExpire == null) { // 只有远程缓存 if (localExpire == null) { // 只有远程缓存
Objects.requireNonNull(remoteExpire); Objects.requireNonNull(remoteExpire);
@@ -563,8 +578,8 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
} }
return getSet( return getSet(
this::bothGetCache, this::bothGetCache,
(n, k, e, t, v) -> { (n, k, l, e, t, v) -> {
localSetCache(n, k, localExpire, t, v); localSetCache(n, k, l, localExpire, t, v);
if (remoteSource != null) { if (remoteSource != null) {
remoteSetCache(n, k, remoteExpire, t, v); remoteSetCache(n, k, remoteExpire, t, v);
} }
@@ -573,6 +588,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
key, key,
type, type,
nullable, nullable,
localLimit,
localExpire, localExpire,
supplier); supplier);
} }
@@ -585,6 +601,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param localExpire 本地过期时长Duration.ZERO为永不过期为null表示不本地缓存 * @param localExpire 本地过期时长Duration.ZERO为永不过期为null表示不本地缓存
* @param remoteExpire 远程过期时长Duration.ZERO为永不过期为null表示不远程缓存 * @param remoteExpire 远程过期时长Duration.ZERO为永不过期为null表示不远程缓存
* @param supplier 数据函数 * @param supplier 数据函数
@@ -596,6 +613,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
String key, String key,
Type type, Type type,
boolean nullable, boolean nullable,
int localLimit,
Duration localExpire, Duration localExpire,
Duration remoteExpire, Duration remoteExpire,
ThrowSupplier<CompletableFuture<T>> supplier) { ThrowSupplier<CompletableFuture<T>> supplier) {
@@ -608,7 +626,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
} }
if (remoteExpire == null) { // 只有本地缓存 if (remoteExpire == null) { // 只有本地缓存
Objects.requireNonNull(localExpire); Objects.requireNonNull(localExpire);
return localGetSetAsync(name, key, type, nullable, localExpire, supplier); return localGetSetAsync(name, key, type, nullable, localLimit, localExpire, supplier);
} }
if (localExpire == null) { // 只有远程缓存 if (localExpire == null) { // 只有远程缓存
Objects.requireNonNull(remoteExpire); Objects.requireNonNull(remoteExpire);
@@ -616,8 +634,8 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
} }
return getSetAsync( return getSetAsync(
this::bothGetCacheAsync, this::bothGetCacheAsync,
(n, k, e, t, v) -> { (n, k, l, e, t, v) -> {
localSetCache(n, k, localExpire, t, v); localSetCache(n, k, l, localExpire, t, v);
if (remoteSource != null) { if (remoteSource != null) {
return remoteSetCacheAsync(n, k, remoteExpire, t, v); return remoteSetCacheAsync(n, k, remoteExpire, t, v);
} else { } else {
@@ -628,6 +646,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
key, key,
type, type,
nullable, nullable,
localLimit,
localExpire, localExpire,
supplier); supplier);
} }
@@ -647,7 +666,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
public <T> void bothSet(String name, String key, Type type, T value, Duration localExpire, Duration remoteExpire) { public <T> void bothSet(String name, String key, Type type, T value, Duration localExpire, Duration remoteExpire) {
checkEnable(); checkEnable();
if (localExpire != null) { if (localExpire != null) {
localSetCache(name, key, type, value, localExpire); localSetCache(name, key, 0, type, value, localExpire);
} }
if (remoteExpire != null && remoteSource != null) { if (remoteExpire != null && remoteSource != null) {
remoteSetCache(name, key, type, value, remoteExpire); remoteSetCache(name, key, type, value, remoteExpire);
@@ -674,11 +693,11 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
String name, String key, Type type, T value, Duration localExpire, Duration remoteExpire) { String name, String key, Type type, T value, Duration localExpire, Duration remoteExpire) {
checkEnable(); checkEnable();
if (localExpire != null) { if (localExpire != null) {
localSetCache(name, key, type, value, localExpire); localSetCache(name, key, 0, type, value, localExpire);
} }
CompletableFuture<Void> future = CompletableFuture.completedFuture(null); CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (remoteSource != null && remoteExpire != null) { if (remoteSource != null && remoteExpire != null) {
future = remoteSetCacheAsync(name, key, type, value, remoteExpire); future = remoteSetCacheAsync(name, key, 0, type, value, remoteExpire);
} }
if (remoteSource != null && broadcastable) { if (remoteSource != null && broadcastable) {
future = future.thenCompose(r -> remoteSource future = future.thenCompose(r -> remoteSource
@@ -745,6 +764,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param expire 过期时长Duration.ZERO为永不过期 * @param expire 过期时长Duration.ZERO为永不过期
* @param supplier 数据函数 * @param supplier 数据函数
* @return 数据值 * @return 数据值
@@ -756,6 +776,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
String key, String key,
Type type, Type type,
boolean nullable, boolean nullable,
int localLimit,
Duration expire, Duration expire,
ThrowSupplier<T> supplier) { ThrowSupplier<T> supplier) {
checkEnable(); checkEnable();
@@ -764,7 +785,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
Objects.requireNonNull(supplier); Objects.requireNonNull(supplier);
final Type cacheType = loadCacheType(type); final Type cacheType = loadCacheType(type);
final String id = idFor(name, key); final String id = idFor(name, key);
CachedValue<T> cacheVal = getter.get(name, key, expire, cacheType); CachedValue<T> cacheVal = getter.get(name, key, localLimit, expire, cacheType);
if (CachedValue.isValid(cacheVal)) { if (CachedValue.isValid(cacheVal)) {
if (logable) { if (logable) {
logger.log(logLevel, "Cached got id(" + id + ") value from eitherSource"); logger.log(logLevel, "Cached got id(" + id + ") value from eitherSource");
@@ -772,7 +793,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
return cacheVal.getVal(); return cacheVal.getVal();
} }
Function<String, CachedValue> func = k -> { Function<String, CachedValue> func = k -> {
CachedValue<T> oldCacheVal = getter.get(name, key, expire, cacheType); CachedValue<T> oldCacheVal = getter.get(name, key, localLimit, expire, cacheType);
if (CachedValue.isValid(oldCacheVal)) { if (CachedValue.isValid(oldCacheVal)) {
return oldCacheVal; return oldCacheVal;
} }
@@ -785,7 +806,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
throw new RedkaleException(t); throw new RedkaleException(t);
} }
if (CachedValue.isValid(newCacheVal)) { if (CachedValue.isValid(newCacheVal)) {
setter.set(name, key, expire, cacheType, newCacheVal); setter.set(name, key, localLimit, expire, cacheType, newCacheVal);
} }
return newCacheVal; return newCacheVal;
}; };
@@ -807,6 +828,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
* @param key 缓存键 * @param key 缓存键
* @param type 数据类型 * @param type 数据类型
* @param nullable 是否缓存null值 * @param nullable 是否缓存null值
* @param localLimit 本地缓存数量上限
* @param expire 过期时长Duration.ZERO为永不过期 * @param expire 过期时长Duration.ZERO为永不过期
* @param supplier 数据函数 * @param supplier 数据函数
* @return 数据值 * @return 数据值
@@ -818,6 +840,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
String key, String key,
Type type, Type type,
boolean nullable, boolean nullable,
int localLimit,
Duration expire, Duration expire,
ThrowSupplier<CompletableFuture<T>> supplier) { ThrowSupplier<CompletableFuture<T>> supplier) {
checkEnable(); checkEnable();
@@ -825,7 +848,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
Objects.requireNonNull(supplier); Objects.requireNonNull(supplier);
final Type cacheType = loadCacheType(type); final Type cacheType = loadCacheType(type);
final String id = idFor(name, key); final String id = idFor(name, key);
CompletableFuture<CachedValue<T>> sourceFuture = getter.get(name, key, expire, cacheType); CompletableFuture<CachedValue<T>> sourceFuture = getter.get(name, key, localLimit, expire, cacheType);
return sourceFuture.thenCompose(val -> { return sourceFuture.thenCompose(val -> {
if (CachedValue.isValid(val)) { if (CachedValue.isValid(val)) {
if (logable) { if (logable) {
@@ -843,7 +866,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
} }
CachedValue<T> cacheVal = toCacheValue(nullable, v); CachedValue<T> cacheVal = toCacheValue(nullable, v);
if (CachedValue.isValid(cacheVal)) { if (CachedValue.isValid(cacheVal)) {
setter.set(name, key, expire, cacheType, cacheVal) setter.set(name, key, localLimit, expire, cacheType, cacheVal)
.whenComplete((v2, e2) -> lock.success(CachedValue.get(cacheVal))); .whenComplete((v2, e2) -> lock.success(CachedValue.get(cacheVal)));
} else { } else {
lock.success(CachedValue.get(cacheVal)); lock.success(CachedValue.get(cacheVal));
@@ -857,27 +880,34 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
}); });
} }
protected <T> void localSetCache(String name, String key, Type type, T value, Duration expire) { protected <T> void localSetCache(String name, String key, int localLimit, Type type, T value, Duration expire) {
localSetCache(name, key, expire, loadCacheType(type, value), CachedValue.create(value)); localSetCache(name, key, localLimit, expire, loadCacheType(type, value), CachedValue.create(value));
} }
protected <T> void localSetCache( protected <T> void localSetCache(
String name, String key, Duration expire, Type cacheType, CachedValue<T> cacheVal) { String name, String key, int localLimit, Duration expire, Type cacheType, CachedValue<T> cacheVal) {
checkEnable(); checkEnable();
boolean logable = logger.isLoggable(logLevel); boolean logable = logger.isLoggable(logLevel);
Objects.requireNonNull(expire); Objects.requireNonNull(expire);
long millis = expire.toMillis(); long millis = expire.toMillis();
String id = idFor(name, key); String id = idFor(name, key);
if (logable) { if (logable) {
logger.log(logLevel, "Cached set id(" + id + ") value to localSource expire " + millis + " ms"); logger.log(
logLevel,
"Cached set id(" + id + ") value to localSource expire " + millis + " ms, limit " + localLimit);
} }
localSource.set(name, id, millis, cacheType, cacheVal); localSource.set(name, id, localLimit, millis, cacheType, cacheVal);
} }
protected <T> void remoteSetCache(String name, String key, Type type, T value, Duration expire) { protected <T> void remoteSetCache(String name, String key, Type type, T value, Duration expire) {
remoteSetCache(name, key, expire, loadCacheType(type, value), CachedValue.create(value)); remoteSetCache(name, key, expire, loadCacheType(type, value), CachedValue.create(value));
} }
protected <T> void remoteSetCache(
String name, String key, int localLimit, Duration expire, Type cacheType, CachedValue<T> cacheVal) {
remoteSetCache(name, key, expire, cacheType, cacheVal);
}
protected <T> void remoteSetCache( protected <T> void remoteSetCache(
String name, String key, Duration expire, Type cacheType, CachedValue<T> cacheVal) { String name, String key, Duration expire, Type cacheType, CachedValue<T> cacheVal) {
checkEnable(); checkEnable();
@@ -896,28 +926,35 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
} }
protected <T> CompletableFuture<Void> localSetCacheAsync( protected <T> CompletableFuture<Void> localSetCacheAsync(
String name, String key, Type type, T value, Duration expire) { String name, String key, int localLimit, Type type, T value, Duration expire) {
return localSetCacheAsync(name, key, expire, loadCacheType(type, value), CachedValue.create(value)); return localSetCacheAsync(name, key, localLimit, expire, loadCacheType(type, value), CachedValue.create(value));
} }
protected <T> CompletableFuture<Void> localSetCacheAsync( protected <T> CompletableFuture<Void> localSetCacheAsync(
String name, String key, Duration expire, Type cacheType, CachedValue<T> cacheVal) { String name, String key, int localLimit, Duration expire, Type cacheType, CachedValue<T> cacheVal) {
checkEnable(); checkEnable();
boolean logable = logger.isLoggable(logLevel); boolean logable = logger.isLoggable(logLevel);
Objects.requireNonNull(expire); Objects.requireNonNull(expire);
String id = idFor(name, key); String id = idFor(name, key);
long millis = expire.toMillis(); long millis = expire.toMillis();
if (logable) { if (logable) {
logger.log(logLevel, "Cached set id(" + id + ") value to localSource expire " + millis + " ms"); logger.log(
logLevel,
"Cached set id(" + id + ") value to localSource expire " + millis + " ms, limit " + localLimit);
} }
return localSource.setAsync(name, id, millis, cacheType, cacheVal); return localSource.setAsync(name, id, localLimit, millis, cacheType, cacheVal);
} }
protected <T> CompletableFuture<Void> remoteSetCacheAsync( protected <T> CompletableFuture<Void> remoteSetCacheAsync(
String name, String key, Type type, T value, Duration expire) { String name, String key, int localLimit, Type type, T value, Duration expire) {
return remoteSetCacheAsync(name, key, expire, loadCacheType(type, value), CachedValue.create(value)); return remoteSetCacheAsync(name, key, expire, loadCacheType(type, value), CachedValue.create(value));
} }
protected <T> CompletableFuture<Void> remoteSetCacheAsync(
String name, String key, int localLimit, Duration expire, Type cacheType, CachedValue<T> cacheVal) {
return remoteSetCacheAsync(name, key, expire, cacheType, cacheVal);
}
protected <T> CompletableFuture<Void> remoteSetCacheAsync( protected <T> CompletableFuture<Void> remoteSetCacheAsync(
String name, String key, Duration expire, Type cacheType, CachedValue<T> cacheVal) { String name, String key, Duration expire, Type cacheType, CachedValue<T> cacheVal) {
checkEnable(); checkEnable();
@@ -936,7 +973,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
} }
protected <T> CachedValue<T> bothGetCache( protected <T> CachedValue<T> bothGetCache(
final String name, final String key, final Duration expire, final Type cacheType) { final String name, final String key, int localLimit, final Duration expire, final Type cacheType) {
checkEnable(); checkEnable();
boolean logable = logger.isLoggable(logLevel); boolean logable = logger.isLoggable(logLevel);
String id = idFor(name, key); String id = idFor(name, key);
@@ -954,7 +991,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
if (logable) { if (logable) {
logger.log(logLevel, "Cached set id(" + id + ") value to localSource from remoteSource"); logger.log(logLevel, "Cached set id(" + id + ") value to localSource from remoteSource");
} }
localSetCache(name, key, expire, cacheType, cacheVal); localSetCache(name, key, localLimit, expire, cacheType, cacheVal);
} }
if (logable) { if (logable) {
logger.log(logLevel, "Cached got id(" + id + ") value from remoteSource"); logger.log(logLevel, "Cached got id(" + id + ") value from remoteSource");
@@ -972,12 +1009,13 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
* @param <T> 泛型 * @param <T> 泛型
* @param name 缓存名称 * @param name 缓存名称
* @param key 缓存键 * @param key 缓存键
* @param localLimit 本地缓存数量上限
* @param expire 过期时长Duration.ZERO为永不过期 * @param expire 过期时长Duration.ZERO为永不过期
* @param cacheType 数据类型 * @param cacheType 数据类型
* @return 数据值 * @return 数据值
*/ */
protected <T> CompletableFuture<CachedValue<T>> bothGetCacheAsync( protected <T> CompletableFuture<CachedValue<T>> bothGetCacheAsync(
String name, String key, Duration expire, Type cacheType) { String name, String key, int localLimit, Duration expire, Type cacheType) {
checkEnable(); checkEnable();
boolean logable = logger.isLoggable(logLevel); boolean logable = logger.isLoggable(logLevel);
String id = idFor(name, key); String id = idFor(name, key);
@@ -996,7 +1034,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
if (logable) { if (logable) {
logger.log(logLevel, "Cached set id(" + id + ") value to localSource from remoteSource"); logger.log(logLevel, "Cached set id(" + id + ") value to localSource from remoteSource");
} }
localSetCache(name, key, expire, cacheType, v); localSetCache(name, key, localLimit, expire, cacheType, v);
} }
if (logable) { if (logable) {
logger.log(logLevel, "Cached got id(" + id + ") value from remoteSource"); logger.log(logLevel, "Cached got id(" + id + ") value from remoteSource");
@@ -1077,18 +1115,18 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
protected static interface GetterFunc<R> { protected static interface GetterFunc<R> {
public R get(String name, String key, Duration expire, Type cacheType); public R get(String name, String key, int localLimit, Duration expire, Type cacheType);
} }
protected static interface SetterSyncFunc { protected static interface SetterSyncFunc {
public void set(String name, String key, Duration expire, Type cacheType, CachedValue cacheVal); public void set(String name, String key, int localLimit, Duration expire, Type cacheType, CachedValue cacheVal);
} }
protected static interface SetterAsyncFunc { protected static interface SetterAsyncFunc {
public CompletableFuture<Void> set( public CompletableFuture<Void> set(
String name, String key, Duration expire, Type cacheType, CachedValue cacheVal); String name, String key, int localLimit, Duration expire, Type cacheType, CachedValue cacheVal);
} }
public class CacheRemoteListener implements CacheEventListener<CachedEventMessage> { public class CacheRemoteListener implements CacheEventListener<CachedEventMessage> {

View File

@@ -30,6 +30,8 @@ public @interface DynForCached {
String key(); String key();
String localLimit();
String localExpire(); String localExpire();
String remoteExpire(); String remoteExpire();

View File

@@ -53,6 +53,13 @@ public class CachedManagerTest {
bean.setRemark(bean.getRemark() + "-新备注"); bean.setRemark(bean.getRemark() + "-新备注");
Assertions.assertEquals( Assertions.assertEquals(
manager.localGet("name", bean.getName(), CachingBean.class).toString(), json); manager.localGet("name", bean.getName(), CachingBean.class).toString(), json);
manager.localGetSet("group", "key1", CachingBean.class, true, 2, expire, () -> new CachingBean("v1", "r1"));
Utility.sleep(2);
manager.localGetSet("group", "key2", CachingBean.class, true, 2, expire, () -> new CachingBean("v2", "r2"));
Utility.sleep(2);
manager.localGetSet("group", "key3", CachingBean.class, true, 2, expire, () -> new CachingBean("v3", "r3"));
Assertions.assertEquals(2, manager.getLocalSource().getKeyCount("group"));
} }
@Test @Test
@@ -71,6 +78,7 @@ public class CachedManagerTest {
"name", "name",
String.class, String.class,
false, false,
0,
localExpire, localExpire,
remoteExpire, remoteExpire,
() -> bean.getName()); () -> bean.getName());
@@ -82,7 +90,7 @@ public class CachedManagerTest {
} }
Assertions.assertEquals(1, ParallelBean.c1.get()); Assertions.assertEquals(1, ParallelBean.c1.get());
Utility.sleep(200); Utility.sleep(200);
manager.bothGetSet("name", "name", String.class, false, localExpire, remoteExpire, () -> bean.getName()); manager.bothGetSet("name", "name", String.class, false, 0, localExpire, remoteExpire, () -> bean.getName());
Assertions.assertEquals(1, ParallelBean.c1.get()); Assertions.assertEquals(1, ParallelBean.c1.get());
Utility.sleep(300); Utility.sleep(300);
{ {
@@ -94,6 +102,7 @@ public class CachedManagerTest {
"name", "name",
String.class, String.class,
false, false,
0,
localExpire, localExpire,
remoteExpire, remoteExpire,
() -> bean.getName()); () -> bean.getName());
@@ -123,6 +132,15 @@ public class CachedManagerTest {
private String remark; private String remark;
public CachingBean() {
//
}
public CachingBean(String name, String remark) {
this.name = name;
this.remark = remark;
}
public String getName() { public String getName() {
return name; return name;
} }

View File

@@ -50,6 +50,7 @@ public class _DynLocalCacheInstance extends CachedInstance {
key = "name", key = "name",
nullable = false, nullable = false,
timeUnit = TimeUnit.SECONDS, timeUnit = TimeUnit.SECONDS,
localLimit = "-1",
remoteExpire = "-1", remoteExpire = "-1",
localExpire = "30") localExpire = "30")
public String getName() { public String getName() {
@@ -68,6 +69,7 @@ public class _DynLocalCacheInstance extends CachedInstance {
key = "#{id}_#{files.one}", key = "#{id}_#{files.one}",
nullable = false, nullable = false,
timeUnit = TimeUnit.SECONDS, timeUnit = TimeUnit.SECONDS,
localLimit = "-1",
remoteExpire = "60", remoteExpire = "60",
localExpire = "30") localExpire = "30")
public File getInfo(CachedInstance.ParamBean bean, int id, List<String> idList, Map<String, File> files) { public File getInfo(CachedInstance.ParamBean bean, int id, List<String> idList, Map<String, File> files) {
@@ -87,6 +89,7 @@ public class _DynLocalCacheInstance extends CachedInstance {
key = "name", key = "name",
nullable = false, nullable = false,
timeUnit = TimeUnit.SECONDS, timeUnit = TimeUnit.SECONDS,
localLimit = "-1",
remoteExpire = "-1", remoteExpire = "-1",
localExpire = "30") localExpire = "30")
public CompletableFuture<String> getNameAsync() { public CompletableFuture<String> getNameAsync() {
@@ -105,6 +108,7 @@ public class _DynLocalCacheInstance extends CachedInstance {
key = "#{id}_#{files.one}", key = "#{id}_#{files.one}",
nullable = false, nullable = false,
timeUnit = TimeUnit.SECONDS, timeUnit = TimeUnit.SECONDS,
localLimit = "-1",
remoteExpire = "60", remoteExpire = "60",
localExpire = "30") localExpire = "30")
public CompletableFuture<Map<String, Integer>> getInfo2Async( public CompletableFuture<Map<String, Integer>> getInfo2Async(
@@ -128,6 +132,7 @@ public class _DynLocalCacheInstance extends CachedInstance {
key = "name", key = "name",
nullable = false, nullable = false,
timeUnit = TimeUnit.SECONDS, timeUnit = TimeUnit.SECONDS,
localLimit = "-1",
remoteExpire = "60", remoteExpire = "60",
localExpire = "30") localExpire = "30")
public CompletableFuture<String> getName2Async() throws IOException, InstantiationException { public CompletableFuture<String> getName2Async() throws IOException, InstantiationException {
@@ -146,6 +151,7 @@ public class _DynLocalCacheInstance extends CachedInstance {
key = "#{id}_#{files.one}", key = "#{id}_#{files.one}",
nullable = false, nullable = false,
timeUnit = TimeUnit.SECONDS, timeUnit = TimeUnit.SECONDS,
localLimit = "-1",
remoteExpire = "60", remoteExpire = "60",
localExpire = "30") localExpire = "30")
public CompletableFuture<File> getInfoAsync( public CompletableFuture<File> getInfoAsync(
@@ -166,6 +172,7 @@ public class _DynLocalCacheInstance extends CachedInstance {
key = "name", key = "name",
nullable = false, nullable = false,
timeUnit = TimeUnit.SECONDS, timeUnit = TimeUnit.SECONDS,
localLimit = "-1",
remoteExpire = "60", remoteExpire = "60",
localExpire = "30") localExpire = "30")
public String getName2() throws RedkaleException { public String getName2() throws RedkaleException {