CacheSource增加rateLimit接口

This commit is contained in:
redkale
2024-01-20 12:40:26 +08:00
parent ef985e2aef
commit 47584f3106
3 changed files with 182 additions and 2 deletions

View File

@@ -66,7 +66,8 @@ public abstract class AbstractCacheSource extends AbstractService implements Cac
}
//根据配置中创建DataSource
public static CacheSource createCacheSource(ClassLoader serverClassLoader, ResourceFactory resourceFactory, AnyValue sourceConf, String sourceName, boolean compileMode) throws Exception {
public static CacheSource createCacheSource(ClassLoader serverClassLoader, ResourceFactory resourceFactory,
AnyValue sourceConf, String sourceName, boolean compileMode) throws Exception {
CacheSource source = null;
if (serverClassLoader == null) {
serverClassLoader = Thread.currentThread().getContextClassLoader();

View File

@@ -59,6 +59,10 @@ public final class CacheMemorySource extends AbstractCacheSource {
private final ReentrantLock containerLock = new ReentrantLock();
private final ConcurrentHashMap<String, RateLimitEntry> rateLimitContainer = new ConcurrentHashMap<>();
private final ReentrantLock rateLimitContainerLock = new ReentrantLock();
//key: topic
private final Map<String, Set<CacheEventListener<byte[]>>> pubsubListeners = new ConcurrentHashMap<>();
@@ -736,6 +740,75 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyFuture(() -> delex(key, expectedValue));
}
/**
* 令牌桶算法限流, 返回负数表示无令牌, 其他为有令牌
* <pre>
* 每秒限制请求1次: rate:1, capacity:1, requested:1
* 每秒限制请求10次: rate:10, capacity:10, requested:1
* 每分钟限制请求1次: rate:1, capacity:60, requested:60
* 每分钟限制请求10次: rate:1, capacity:60, requested:6
* 每小时限制请求1次: rate:1, capacity:3600, requested:3600
* 每小时限制请求10次: rate:1, capacity:3600, requested:360
* </pre>
*
* @param key 限流的键
* @param rate 令牌桶每秒填充平均速率
* @param capacity 令牌桶总容量
* @param requested 需要的令牌数
*
* @return 可用令牌数
*/
@Override
public long rateLimit(final String key, final long rate, final long capacity, final long requested) {
if (key == null) {
return 0L;
}
if (capacity < rate || capacity < requested || rate <= 0 || requested < 0) {
throw new IllegalArgumentException("rate=" + rate + ", capacity=" + capacity + ", requested=" + requested);
}
RateLimitEntry entry = null;
rateLimitContainerLock.lock();
try {
entry = rateLimitContainer.get(key);
if (entry == null) {
long newTokens = capacity - requested;
entry = new RateLimitEntry(key, newTokens);
rateLimitContainer.put(key, entry);
return newTokens;
}
} finally {
rateLimitContainerLock.unlock();
}
entry.lock();
try {
long now = System.currentTimeMillis();
long ttl = capacity / rate * 2 * 1000;
if (entry.isExpired()) {
entry.milliSeconds(ttl).tokens = capacity;
}
long delta = Math.max(0, (now - entry.timestamp) / 1000);
long filledTokens = Math.min(capacity, entry.tokens + (delta * rate));
boolean allowed = filledTokens >= requested;
long newTokens = filledTokens;
if (allowed) {
newTokens = filledTokens - requested;
}
if (ttl > 0) {
entry.milliSeconds(ttl);
entry.tokens = newTokens;
entry.timestamp = now;
}
return allowed ? newTokens : (filledTokens - requested);
} finally {
entry.unlock();
}
}
@Override
public CompletableFuture<Long> rateLimitAsync(String key, long rate, long capacity, long requested) {
return supplyFuture(() -> rateLimit(key, rate, capacity, requested));
}
@Override
public long incr(final String key) {
return incrby(key, 1);
@@ -2433,6 +2506,70 @@ public final class CacheMemorySource extends AbstractCacheSource {
return entry;
}
public static final class RateLimitEntry {
private String key;
volatile long lastAccessed; //最后刷新时间
//<=0表示永久保存
private long expireMills;
private long initTime;
//令牌数
private long tokens;
//时间戳,单位:毫秒
private long timestamp;
private final ReentrantLock lock = new ReentrantLock();
public RateLimitEntry(String key, long tokens) {
this.key = key;
this.tokens = tokens;
this.timestamp = System.currentTimeMillis();
}
public RateLimitEntry milliSeconds(long milliSeconds) {
this.initTime = System.currentTimeMillis();
this.expireMills = milliSeconds > 0 ? milliSeconds : 0;
return this;
}
@Override
public String toString() {
return JsonFactory.root().getConvert().convertTo(this);
}
@ConvertColumn(ignore = true)
public boolean isExpired() {
long now = System.currentTimeMillis();
return expireMills > 0 && (initTime + expireMills) < now;
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
public long getExpireMills() {
return expireMills;
}
public long getLastAccessed() {
return lastAccessed;
}
public String getKey() {
return key;
}
}
public static enum CacheEntryType {
OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP;
}

View File

@@ -363,7 +363,29 @@ public interface CacheSource extends Resourcable {
return getDel(key, Long.class);
}
//------------------------ 键 Keys ------------------------
//------------------------ 键 Keys ------------------------
/**
* 令牌桶算法限流, 返回负数表示无令牌, 其他为有令牌
* <pre>
* 每秒限制请求1次: rate:1, capacity:1, requested:1
* 每秒限制请求10次: rate:10, capacity:10, requested:1
* 每分钟限制请求1次: rate:1, capacity:60, requested:60
* 每分钟限制请求10次: rate:1, capacity:60, requested:6
* 每小时限制请求1次: rate:1, capacity:3600, requested:3600
* 每小时限制请求10次: rate:1, capacity:3600, requested:360
* </pre>
*
* @param key 限流的键
* @param rate 令牌桶每秒填充平均速率
* @param capacity 令牌桶总容量
* @param requested 需要的令牌数
*
* @return 可用令牌数
*/
default long rateLimit(String key, long rate, long capacity, long requested) {
return rateLimitAsync(key, rate, capacity, requested).join();
}
default long del(String... keys) {
return delAsync(keys).join();
}
@@ -1304,6 +1326,26 @@ public interface CacheSource extends Resourcable {
}
//------------------------ 键 Keys ------------------------
/**
* 令牌桶算法限流, 返回负数表示无令牌, 其他为有令牌
* <pre>
* 每秒限制请求1次: rate:1, capacity:1, requested:1
* 每秒限制请求10次: rate:10, capacity:10, requested:1
* 每分钟限制请求1次: rate:1, capacity:60, requested:60
* 每分钟限制请求10次: rate:1, capacity:60, requested:6
* 每小时限制请求1次: rate:1, capacity:3600, requested:3600
* 每小时限制请求10次: rate:1, capacity:3600, requested:360
* </pre>
*
* @param key 限流的键
* @param rate 令牌桶每秒填充平均速率
* @param capacity 令牌桶总容量
* @param requested 需要的令牌数
*
* @return 可用令牌数
*/
public CompletableFuture<Long> rateLimitAsync(String key, long rate, long capacity, long requested);
public CompletableFuture<Long> delexAsync(String key, String expectedValue);
public CompletableFuture<Long> delAsync(String... keys);