diff --git a/src/main/java/org/redkale/source/AbstractCacheSource.java b/src/main/java/org/redkale/source/AbstractCacheSource.java index 9a29ec483..7cda5afb3 100644 --- a/src/main/java/org/redkale/source/AbstractCacheSource.java +++ b/src/main/java/org/redkale/source/AbstractCacheSource.java @@ -66,7 +66,8 @@ public abstract class AbstractCacheSource extends AbstractService implements Cac } //根据配置中创建DataSource - public static CacheSource createCacheSource(ClassLoader serverClassLoader, ResourceFactory resourceFactory, AnyValue sourceConf, String sourceName, boolean compileMode) throws Exception { + public static CacheSource createCacheSource(ClassLoader serverClassLoader, ResourceFactory resourceFactory, + AnyValue sourceConf, String sourceName, boolean compileMode) throws Exception { CacheSource source = null; if (serverClassLoader == null) { serverClassLoader = Thread.currentThread().getContextClassLoader(); diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 45f5cfaf4..c593af2ac 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -59,6 +59,10 @@ public final class CacheMemorySource extends AbstractCacheSource { private final ReentrantLock containerLock = new ReentrantLock(); + private final ConcurrentHashMap rateLimitContainer = new ConcurrentHashMap<>(); + + private final ReentrantLock rateLimitContainerLock = new ReentrantLock(); + //key: topic private final Map>> pubsubListeners = new ConcurrentHashMap<>(); @@ -736,6 +740,75 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyFuture(() -> delex(key, expectedValue)); } + /** + * 令牌桶算法限流, 返回负数表示无令牌, 其他为有令牌 + *
+     * 每秒限制请求1次:     rate:1,     capacity:1,     requested:1
+     * 每秒限制请求10次:    rate:10,    capacity:10,    requested:1
+     * 每分钟限制请求1次:   rate:1,     capacity:60,    requested:60
+     * 每分钟限制请求10次:  rate:1,     capacity:60,    requested:6
+     * 每小时限制请求1次:   rate:1,     capacity:3600,  requested:3600
+     * 每小时限制请求10次:  rate:1,     capacity:3600,  requested:360
+     * 
+ * + * @param key 限流的键 + * @param rate 令牌桶每秒填充平均速率 + * @param capacity 令牌桶总容量 + * @param requested 需要的令牌数 + * + * @return 可用令牌数 + */ + @Override + public long rateLimit(final String key, final long rate, final long capacity, final long requested) { + if (key == null) { + return 0L; + } + if (capacity < rate || capacity < requested || rate <= 0 || requested < 0) { + throw new IllegalArgumentException("rate=" + rate + ", capacity=" + capacity + ", requested=" + requested); + } + RateLimitEntry entry = null; + rateLimitContainerLock.lock(); + try { + entry = rateLimitContainer.get(key); + if (entry == null) { + long newTokens = capacity - requested; + entry = new RateLimitEntry(key, newTokens); + rateLimitContainer.put(key, entry); + return newTokens; + } + } finally { + rateLimitContainerLock.unlock(); + } + entry.lock(); + try { + long now = System.currentTimeMillis(); + long ttl = capacity / rate * 2 * 1000; + if (entry.isExpired()) { + entry.milliSeconds(ttl).tokens = capacity; + } + long delta = Math.max(0, (now - entry.timestamp) / 1000); + long filledTokens = Math.min(capacity, entry.tokens + (delta * rate)); + boolean allowed = filledTokens >= requested; + long newTokens = filledTokens; + if (allowed) { + newTokens = filledTokens - requested; + } + if (ttl > 0) { + entry.milliSeconds(ttl); + entry.tokens = newTokens; + entry.timestamp = now; + } + return allowed ? newTokens : (filledTokens - requested); + } finally { + entry.unlock(); + } + } + + @Override + public CompletableFuture rateLimitAsync(String key, long rate, long capacity, long requested) { + return supplyFuture(() -> rateLimit(key, rate, capacity, requested)); + } + @Override public long incr(final String key) { return incrby(key, 1); @@ -2433,6 +2506,70 @@ public final class CacheMemorySource extends AbstractCacheSource { return entry; } + public static final class RateLimitEntry { + + private String key; + + volatile long lastAccessed; //最后刷新时间 + + //<=0表示永久保存 + private long expireMills; + + private long initTime; + + //令牌数 + private long tokens; + + //时间戳,单位:毫秒 + private long timestamp; + + private final ReentrantLock lock = new ReentrantLock(); + + public RateLimitEntry(String key, long tokens) { + this.key = key; + this.tokens = tokens; + this.timestamp = System.currentTimeMillis(); + } + + public RateLimitEntry milliSeconds(long milliSeconds) { + this.initTime = System.currentTimeMillis(); + this.expireMills = milliSeconds > 0 ? milliSeconds : 0; + return this; + } + + @Override + public String toString() { + return JsonFactory.root().getConvert().convertTo(this); + } + + @ConvertColumn(ignore = true) + public boolean isExpired() { + long now = System.currentTimeMillis(); + return expireMills > 0 && (initTime + expireMills) < now; + } + + public void lock() { + lock.lock(); + } + + public void unlock() { + lock.unlock(); + } + + public long getExpireMills() { + return expireMills; + } + + public long getLastAccessed() { + return lastAccessed; + } + + public String getKey() { + return key; + } + + } + public static enum CacheEntryType { OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP; } diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index 4a1c0cd52..9c8c9e815 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -363,7 +363,29 @@ public interface CacheSource extends Resourcable { return getDel(key, Long.class); } - //------------------------ 键 Keys ------------------------ + //------------------------ 键 Keys ------------------------ + /** + * 令牌桶算法限流, 返回负数表示无令牌, 其他为有令牌 + *
+     * 每秒限制请求1次:     rate:1,     capacity:1,     requested:1
+     * 每秒限制请求10次:    rate:10,    capacity:10,    requested:1
+     * 每分钟限制请求1次:   rate:1,     capacity:60,    requested:60
+     * 每分钟限制请求10次:  rate:1,     capacity:60,    requested:6
+     * 每小时限制请求1次:   rate:1,     capacity:3600,  requested:3600
+     * 每小时限制请求10次:  rate:1,     capacity:3600,  requested:360
+     * 
+ * + * @param key 限流的键 + * @param rate 令牌桶每秒填充平均速率 + * @param capacity 令牌桶总容量 + * @param requested 需要的令牌数 + * + * @return 可用令牌数 + */ + default long rateLimit(String key, long rate, long capacity, long requested) { + return rateLimitAsync(key, rate, capacity, requested).join(); + } + default long del(String... keys) { return delAsync(keys).join(); } @@ -1304,6 +1326,26 @@ public interface CacheSource extends Resourcable { } //------------------------ 键 Keys ------------------------ + /** + * 令牌桶算法限流, 返回负数表示无令牌, 其他为有令牌 + *
+     * 每秒限制请求1次:     rate:1,     capacity:1,     requested:1
+     * 每秒限制请求10次:    rate:10,    capacity:10,    requested:1
+     * 每分钟限制请求1次:   rate:1,     capacity:60,    requested:60
+     * 每分钟限制请求10次:  rate:1,     capacity:60,    requested:6
+     * 每小时限制请求1次:   rate:1,     capacity:3600,  requested:3600
+     * 每小时限制请求10次:  rate:1,     capacity:3600,  requested:360
+     * 
+ * + * @param key 限流的键 + * @param rate 令牌桶每秒填充平均速率 + * @param capacity 令牌桶总容量 + * @param requested 需要的令牌数 + * + * @return 可用令牌数 + */ + public CompletableFuture rateLimitAsync(String key, long rate, long capacity, long requested); + public CompletableFuture delexAsync(String key, String expectedValue); public CompletableFuture delAsync(String... keys);