diff --git a/src/main/java/org/redkale/cached/CachedManager.java b/src/main/java/org/redkale/cached/CachedManager.java index 7521b5b9e..ef97b554a 100644 --- a/src/main/java/org/redkale/cached/CachedManager.java +++ b/src/main/java/org/redkale/cached/CachedManager.java @@ -55,13 +55,6 @@ public interface CachedManager extends Resourcable { */ public String getSchema(); - /** - * 获取本地缓存Source - * - * @return {@link org.redkale.source.CacheSource} - */ - public CacheSource getLocalSource(); - /** * 获取远程缓存Source, 可能为null * diff --git a/src/main/java/org/redkale/cached/spi/CachedAction.java b/src/main/java/org/redkale/cached/spi/CachedAction.java index ef925170c..0ae3d816b 100644 --- a/src/main/java/org/redkale/cached/spi/CachedAction.java +++ b/src/main/java/org/redkale/cached/spi/CachedAction.java @@ -59,9 +59,9 @@ public class CachedAction { @Resource private Environment environment; - + private CachedManager manager; - + // 缓存名称 private String name; diff --git a/src/main/java/org/redkale/cached/spi/CachedLocalSource.java b/src/main/java/org/redkale/cached/spi/CachedLocalSource.java new file mode 100644 index 000000000..a1dabd2cd --- /dev/null +++ b/src/main/java/org/redkale/cached/spi/CachedLocalSource.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2016-2116 Redkale + * All rights reserved. + */ +package org.redkale.cached.spi; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.redkale.annotation.AutoLoad; +import org.redkale.annotation.Nullable; +import org.redkale.convert.ConvertDisabled; +import org.redkale.convert.json.JsonConvert; +import org.redkale.convert.json.JsonFactory; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; +import org.redkale.util.Utility; + +/** + * 本地缓存源 + * + *

详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + */ +@AutoLoad(false) +public class CachedLocalSource implements Service { + + private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + private final JsonConvert convert = JsonFactory.create().skipAllIgnore(true).getConvert(); + + // key: name, sub-key: key + private final ConcurrentHashMap> container = new ConcurrentHashMap<>(); + + private ScheduledThreadPoolExecutor scheduler; + + @Override + public void init(AnyValue conf) { + if (scheduler == null) { + this.scheduler = Utility.newScheduledExecutor( + 1, "Redkale-" + CachedLocalSource.class.getSimpleName() + "-Expirer-Thread"); + final List keys = new ArrayList<>(); + int interval = 30; + scheduler.scheduleWithFixedDelay( + () -> { + try { + container.forEach((n, m) -> { + keys.clear(); + long now = System.currentTimeMillis(); + m.forEach((k, x) -> { + if (x.isExpired(now)) { + keys.add(k); + } + }); + for (String key : keys) { + m.remove(key); + } + }); + } catch (Throwable t) { + logger.log(Level.SEVERE, "CachedLocalSource schedule(interval=" + interval + "s) error", t); + } + }, + interval, + interval, + TimeUnit.SECONDS); + } + } + + @Override + public void destroy(AnyValue conf) { + if (scheduler != null) { + scheduler.shutdownNow(); + scheduler = null; + } + } + + public void set(String name, String key, long millis, Type type, T value) { + // millis > 0 才需要过期设置 + String json = convert.convertTo(type, value); + container + .computeIfAbsent(name, n -> new ConcurrentHashMap<>()) + .computeIfAbsent(key, k -> new CacheItem(json)) + .set(json, millis); + } + + public T get(String name, String key, Type type) { + Map map = container.get(name); + CacheItem item = map == null ? null : map.get(key); + String json = item == null || item.isExpired() ? null : item.getValue(); + return json == null ? null : convert.convertFrom(type, json); + } + + public long del(String name, String key) { + Map map = container.get(name); + return map != null && map.remove(key) != null ? 1 : 0; + } + + public CompletableFuture getAsync(String name, String key, Type type) { + return CompletableFuture.completedFuture(get(name, key, type)); + } + + public CompletableFuture delAsync(String name, String key) { + return CompletableFuture.completedFuture(del(name, key)); + } + + public CompletableFuture setAsync(String name, String key, long millis, Type type, T value) { + return CompletableFuture.runAsync(() -> set(name, key, millis, type, value)); + } + + protected static class CacheItem { + + @Nullable // json格式 + protected String value; + + // 为0表示永久, 大于0表示有过期时间 + private long endMillis; + + public CacheItem(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public void set(String value, long millis) { + this.value = value; + this.endMillis = millis > 0 ? (System.currentTimeMillis() + millis) : 0; + } + + @ConvertDisabled + public boolean isExpired() { + return endMillis > 0 && System.currentTimeMillis() >= endMillis; + } + + boolean isExpired(long now) { + return endMillis > 0 && now >= endMillis; + } + } +} diff --git a/src/main/java/org/redkale/cached/spi/CachedManagerService.java b/src/main/java/org/redkale/cached/spi/CachedManagerService.java index 25cdd1171..7e8d01cd0 100644 --- a/src/main/java/org/redkale/cached/spi/CachedManagerService.java +++ b/src/main/java/org/redkale/cached/spi/CachedManagerService.java @@ -25,7 +25,6 @@ import org.redkale.cached.CachedManager; import org.redkale.service.Local; import org.redkale.service.Service; import org.redkale.source.CacheEventListener; -import org.redkale.source.CacheMemorySource; import org.redkale.source.CacheSource; import org.redkale.util.AnyValue; import org.redkale.util.RedkaleException; @@ -73,7 +72,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se private final ConcurrentHashMap cacheValueTypes = new ConcurrentHashMap<>(); // 本地缓存Source - protected final CacheMemorySource localSource = new CacheMemorySource("cache-local"); + protected final CachedLocalSource localSource = new CachedLocalSource(); // 缓存无效时使用的同步锁 private final ConcurrentHashMap syncLockMap = new ConcurrentHashMap<>(); @@ -203,16 +202,6 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se } } - /** - * 获取本地缓存Source - * - * @return {@link org.redkale.source.CacheSource} - */ - @Override - public CacheSource getLocalSource() { - return localSource; - } - /** * 获取远程缓存Source, 可能为null * @@ -267,7 +256,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se @Override public T localGet(String name, String key, Type type) { checkEnable(); - return CachedValue.get(localSource.get(idFor(name, key), loadCacheType(type))); + return CachedValue.get(localSource.get(name, idFor(name, key), loadCacheType(type))); } /** @@ -286,7 +275,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se public T localGetSet( String name, String key, Type type, boolean nullable, Duration expire, ThrowSupplier supplier) { return getSet( - (n, k, ex, ct) -> localSource.get(idFor(n, k), ct), + (n, k, ex, ct) -> localSource.get(name, idFor(n, k), ct), this::localSetCache, name, key, @@ -317,7 +306,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se Duration expire, ThrowSupplier> supplier) { return getSetAsync( - (n, k, e, c) -> localSource.getAsync(idFor(n, k), c), + (n, k, e, c) -> localSource.getAsync(name, idFor(n, k), c), this::localSetCacheAsync, name, key, @@ -352,7 +341,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se @Override public long localDel(String name, String key) { checkEnable(); - return localSource.del(idFor(name, key)); + return localSource.del(name, idFor(name, key)); } // -------------------------------------- 远程缓存 -------------------------------------- @@ -706,7 +695,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se public long bothDel(String name, String key) { checkEnable(); String id = idFor(name, key); - long v = localSource.del(id); + long v = localSource.del(name, id); if (remoteSource != null) { v = remoteSource.del(id); if (broadcastable) { @@ -727,7 +716,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se public CompletableFuture bothDelAsync(String name, String key) { checkEnable(); String id = idFor(name, key); - long v = localSource.del(id); // 内存操作,无需异步 + long v = localSource.del(name, id); // 内存操作,无需异步 if (remoteSource != null) { return remoteSource.delAsync(id).thenCompose(r -> { return broadcastable @@ -878,11 +867,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se if (logable) { logger.log(logLevel, "Cached set id(" + id + ") value to localSource expire " + millis + " ms"); } - if (millis > 0) { - localSource.psetex(id, millis, cacheType, cacheVal); - } else { - localSource.set(id, cacheType, cacheVal); - } + localSource.set(name, id, millis, cacheType, cacheVal); } protected void remoteSetCache(String name, String key, Type type, T value, Duration expire) { @@ -921,11 +906,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se if (logable) { logger.log(logLevel, "Cached set id(" + id + ") value to localSource expire " + millis + " ms"); } - if (millis > 0) { - return localSource.psetexAsync(id, millis, cacheType, cacheVal); - } else { - return localSource.setAsync(id, cacheType, cacheVal); - } + return localSource.setAsync(name, id, millis, cacheType, cacheVal); } protected CompletableFuture remoteSetCacheAsync( @@ -955,7 +936,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se checkEnable(); boolean logable = logger.isLoggable(logLevel); String id = idFor(name, key); - CachedValue cacheVal = localSource.get(id, cacheType); + CachedValue cacheVal = localSource.get(name, id, cacheType); if (CachedValue.isValid(cacheVal)) { if (logable) { logger.log(logLevel, "Cached got id(" + id + ") value from localSource"); @@ -996,7 +977,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se checkEnable(); boolean logable = logger.isLoggable(logLevel); String id = idFor(name, key); - CachedValue val = localSource.get(id, cacheType); // 内存操作,无需异步 + CachedValue val = localSource.get(name, id, cacheType); // 内存操作,无需异步 if (CachedValue.isValid(val)) { if (logable) { logger.log(logLevel, "Cached got id(" + id + ") value from localSource"); @@ -1111,7 +1092,7 @@ public class CachedManagerService implements CachedManager, CachedActionFunc, Se @Override public void onMessage(String topic, CachedEventMessage message) { if (!Objects.equals(getNode(), message.getNode())) { - localSource.del(idFor(message.getName(), message.getKey())); + localSource.del(message.getName(), idFor(message.getName(), message.getKey())); } } } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index ccb7b315f..bb673cf93 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -153,6 +153,7 @@ public final class CacheMemorySource extends AbstractCacheSource { this.scheduler = Utility.newScheduledExecutor( 1, "Redkale-" + CacheMemorySource.class.getSimpleName() + "-" + resourceName() + "-Expirer-Thread"); final List keys = new ArrayList<>(); + int interval = 30; scheduler.scheduleWithFixedDelay( () -> { try { @@ -179,11 +180,11 @@ public final class CacheMemorySource extends AbstractCacheSource { rateLimitContainer.remove(key); } } catch (Throwable t) { - logger.log(Level.SEVERE, "CacheMemorySource schedule(interval=" + 10 + "s) error", t); + logger.log(Level.SEVERE, "CacheMemorySource schedule(interval=" + interval + "s) error", t); } }, - 10, - 10, + interval, + interval, TimeUnit.SECONDS); logger.info( self.getClass().getSimpleName() + ":" + self.resourceName() + " start schedule expire executor"); @@ -204,9 +205,11 @@ public final class CacheMemorySource extends AbstractCacheSource { public void destroy(AnyValue conf) { if (scheduler != null) { scheduler.shutdownNow(); + scheduler = null; } if (subExecutor != null) { subExecutor.shutdown(); + subExecutor = null; } }