CachedLocalSource

This commit is contained in:
redkale
2024-09-11 13:45:30 +08:00
parent 91eee09e2f
commit 357d8b3d11
5 changed files with 169 additions and 43 deletions

View File

@@ -55,13 +55,6 @@ public interface CachedManager extends Resourcable {
*/
public String getSchema();
/**
* 获取本地缓存Source
*
* @return {@link org.redkale.source.CacheSource}
*/
public CacheSource getLocalSource();
/**
* 获取远程缓存Source, 可能为null
*

View File

@@ -59,9 +59,9 @@ public class CachedAction {
@Resource
private Environment environment;
private CachedManager manager;
// 缓存名称
private String name;

View File

@@ -0,0 +1,149 @@
/*
* Copyright (c) 2016-2116 Redkale
* All rights reserved.
*/
package org.redkale.cached.spi;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Nullable;
import org.redkale.convert.ConvertDisabled;
import org.redkale.convert.json.JsonConvert;
import org.redkale.convert.json.JsonFactory;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.Utility;
/**
* 本地缓存源
*
* <p>详情见: https://redkale.org
*
* @author zhangjx
* @since 2.8.0
*/
@AutoLoad(false)
public class CachedLocalSource implements Service {
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private final JsonConvert convert = JsonFactory.create().skipAllIgnore(true).getConvert();
// key: name, sub-key: key
private final ConcurrentHashMap<String, Map<String, CacheItem>> container = new ConcurrentHashMap<>();
private ScheduledThreadPoolExecutor scheduler;
@Override
public void init(AnyValue conf) {
if (scheduler == null) {
this.scheduler = Utility.newScheduledExecutor(
1, "Redkale-" + CachedLocalSource.class.getSimpleName() + "-Expirer-Thread");
final List<String> keys = new ArrayList<>();
int interval = 30;
scheduler.scheduleWithFixedDelay(
() -> {
try {
container.forEach((n, m) -> {
keys.clear();
long now = System.currentTimeMillis();
m.forEach((k, x) -> {
if (x.isExpired(now)) {
keys.add(k);
}
});
for (String key : keys) {
m.remove(key);
}
});
} catch (Throwable t) {
logger.log(Level.SEVERE, "CachedLocalSource schedule(interval=" + interval + "s) error", t);
}
},
interval,
interval,
TimeUnit.SECONDS);
}
}
@Override
public void destroy(AnyValue conf) {
if (scheduler != null) {
scheduler.shutdownNow();
scheduler = null;
}
}
public <T> void set(String name, String key, long millis, Type type, T value) {
// millis > 0 才需要过期设置
String json = convert.convertTo(type, value);
container
.computeIfAbsent(name, n -> new ConcurrentHashMap<>())
.computeIfAbsent(key, k -> new CacheItem(json))
.set(json, millis);
}
public <T> T get(String name, String key, Type type) {
Map<String, CacheItem> map = container.get(name);
CacheItem item = map == null ? null : map.get(key);
String json = item == null || item.isExpired() ? null : item.getValue();
return json == null ? null : convert.convertFrom(type, json);
}
public long del(String name, String key) {
Map<String, CacheItem> map = container.get(name);
return map != null && map.remove(key) != null ? 1 : 0;
}
public <T> CompletableFuture<T> getAsync(String name, String key, Type type) {
return CompletableFuture.completedFuture(get(name, key, type));
}
public CompletableFuture<Long> delAsync(String name, String key) {
return CompletableFuture.completedFuture(del(name, key));
}
public <T> CompletableFuture<Void> setAsync(String name, String key, long millis, Type type, T value) {
return CompletableFuture.runAsync(() -> set(name, key, millis, type, value));
}
protected static class CacheItem {
@Nullable // json格式
protected String value;
// 为0表示永久 大于0表示有过期时间
private long endMillis;
public CacheItem(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public void set(String value, long millis) {
this.value = value;
this.endMillis = millis > 0 ? (System.currentTimeMillis() + millis) : 0;
}
@ConvertDisabled
public boolean isExpired() {
return endMillis > 0 && System.currentTimeMillis() >= endMillis;
}
boolean isExpired(long now) {
return endMillis > 0 && now >= endMillis;
}
}
}

View File

@@ -25,7 +25,6 @@ import org.redkale.cached.CachedManager;
import org.redkale.service.Local;
import org.redkale.service.Service;
import org.redkale.source.CacheEventListener;
import org.redkale.source.CacheMemorySource;
import org.redkale.source.CacheSource;
import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleException;
@@ -73,7 +72,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
private final ConcurrentHashMap<Type, Type> cacheValueTypes = new ConcurrentHashMap<>();
// 本地缓存Source
protected final CacheMemorySource localSource = new CacheMemorySource("cache-local");
protected final CachedLocalSource localSource = new CachedLocalSource();
// 缓存无效时使用的同步锁
private final ConcurrentHashMap<String, CachedValue> syncLockMap = new ConcurrentHashMap<>();
@@ -203,16 +202,6 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
}
}
/**
* 获取本地缓存Source
*
* @return {@link org.redkale.source.CacheSource}
*/
@Override
public CacheSource getLocalSource() {
return localSource;
}
/**
* 获取远程缓存Source, 可能为null
*
@@ -267,7 +256,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
@Override
public <T> T localGet(String name, String key, Type type) {
checkEnable();
return CachedValue.get(localSource.get(idFor(name, key), loadCacheType(type)));
return CachedValue.get(localSource.get(name, idFor(name, key), loadCacheType(type)));
}
/**
@@ -286,7 +275,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
public <T> T localGetSet(
String name, String key, Type type, boolean nullable, Duration expire, ThrowSupplier<T> supplier) {
return getSet(
(n, k, ex, ct) -> localSource.get(idFor(n, k), ct),
(n, k, ex, ct) -> localSource.get(name, idFor(n, k), ct),
this::localSetCache,
name,
key,
@@ -317,7 +306,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
Duration expire,
ThrowSupplier<CompletableFuture<T>> supplier) {
return getSetAsync(
(n, k, e, c) -> localSource.getAsync(idFor(n, k), c),
(n, k, e, c) -> localSource.getAsync(name, idFor(n, k), c),
this::localSetCacheAsync,
name,
key,
@@ -352,7 +341,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
@Override
public long localDel(String name, String key) {
checkEnable();
return localSource.del(idFor(name, key));
return localSource.del(name, idFor(name, key));
}
// -------------------------------------- 远程缓存 --------------------------------------
@@ -706,7 +695,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
public long bothDel(String name, String key) {
checkEnable();
String id = idFor(name, key);
long v = localSource.del(id);
long v = localSource.del(name, id);
if (remoteSource != null) {
v = remoteSource.del(id);
if (broadcastable) {
@@ -727,7 +716,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
public CompletableFuture<Long> bothDelAsync(String name, String key) {
checkEnable();
String id = idFor(name, key);
long v = localSource.del(id); // 内存操作,无需异步
long v = localSource.del(name, id); // 内存操作,无需异步
if (remoteSource != null) {
return remoteSource.delAsync(id).thenCompose(r -> {
return broadcastable
@@ -878,11 +867,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
if (logable) {
logger.log(logLevel, "Cached set id(" + id + ") value to localSource expire " + millis + " ms");
}
if (millis > 0) {
localSource.psetex(id, millis, cacheType, cacheVal);
} else {
localSource.set(id, cacheType, cacheVal);
}
localSource.set(name, id, millis, cacheType, cacheVal);
}
protected <T> void remoteSetCache(String name, String key, Type type, T value, Duration expire) {
@@ -921,11 +906,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
if (logable) {
logger.log(logLevel, "Cached set id(" + id + ") value to localSource expire " + millis + " ms");
}
if (millis > 0) {
return localSource.psetexAsync(id, millis, cacheType, cacheVal);
} else {
return localSource.setAsync(id, cacheType, cacheVal);
}
return localSource.setAsync(name, id, millis, cacheType, cacheVal);
}
protected <T> CompletableFuture<Void> remoteSetCacheAsync(
@@ -955,7 +936,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
checkEnable();
boolean logable = logger.isLoggable(logLevel);
String id = idFor(name, key);
CachedValue<T> cacheVal = localSource.get(id, cacheType);
CachedValue<T> cacheVal = localSource.get(name, id, cacheType);
if (CachedValue.isValid(cacheVal)) {
if (logable) {
logger.log(logLevel, "Cached got id(" + id + ") value from localSource");
@@ -996,7 +977,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
checkEnable();
boolean logable = logger.isLoggable(logLevel);
String id = idFor(name, key);
CachedValue<T> val = localSource.get(id, cacheType); // 内存操作,无需异步
CachedValue<T> val = localSource.get(name, id, cacheType); // 内存操作,无需异步
if (CachedValue.isValid(val)) {
if (logable) {
logger.log(logLevel, "Cached got id(" + id + ") value from localSource");
@@ -1111,7 +1092,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se
@Override
public void onMessage(String topic, CachedEventMessage message) {
if (!Objects.equals(getNode(), message.getNode())) {
localSource.del(idFor(message.getName(), message.getKey()));
localSource.del(message.getName(), idFor(message.getName(), message.getKey()));
}
}
}

View File

@@ -153,6 +153,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
this.scheduler = Utility.newScheduledExecutor(
1, "Redkale-" + CacheMemorySource.class.getSimpleName() + "-" + resourceName() + "-Expirer-Thread");
final List<String> keys = new ArrayList<>();
int interval = 30;
scheduler.scheduleWithFixedDelay(
() -> {
try {
@@ -179,11 +180,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
rateLimitContainer.remove(key);
}
} catch (Throwable t) {
logger.log(Level.SEVERE, "CacheMemorySource schedule(interval=" + 10 + "s) error", t);
logger.log(Level.SEVERE, "CacheMemorySource schedule(interval=" + interval + "s) error", t);
}
},
10,
10,
interval,
interval,
TimeUnit.SECONDS);
logger.info(
self.getClass().getSimpleName() + ":" + self.resourceName() + " start schedule expire executor");
@@ -204,9 +205,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
public void destroy(AnyValue conf) {
if (scheduler != null) {
scheduler.shutdownNow();
scheduler = null;
}
if (subExecutor != null) {
subExecutor.shutdown();
subExecutor = null;
}
}