CacheSource增加zincrby方法

This commit is contained in:
redkale
2023-06-16 09:15:36 +08:00
parent 644186a398
commit 197d76fd3c
3 changed files with 104 additions and 27 deletions

View File

@@ -9,7 +9,7 @@ import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.*;
@@ -118,7 +118,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
scheduler.scheduleWithFixedDelay(() -> {
try {
keys.clear();
int now = (int) (System.currentTimeMillis() / 1000);
long now = System.currentTimeMillis();
container.forEach((k, x) -> {
if (x.expireSeconds > 0 && (now > (x.lastAccessed + x.expireSeconds))) {
keys.add(x.key);
@@ -227,7 +227,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong());
if (!(val instanceof AtomicLong)) {
entry.mapLock.lock();
entry.lock.lock();
try {
if (!(val instanceof AtomicLong)) {
if (val == null) {
@@ -238,7 +238,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.mapValue.put(field, val);
}
} finally {
entry.mapLock.unlock();
entry.lock.unlock();
}
}
return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num);
@@ -265,7 +265,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong());
if (!(val instanceof AtomicLong)) {
entry.mapLock.lock();
entry.lock.lock();
try {
if (!(val instanceof AtomicLong)) {
if (val == null) {
@@ -276,7 +276,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.mapValue.put(field, val);
}
} finally {
entry.mapLock.unlock();
entry.lock.unlock();
}
}
return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num)));
@@ -468,7 +468,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null || entry.isExpired()) {
return null;
}
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.lastAccessed = System.currentTimeMillis();
entry.expireSeconds = expireSeconds;
if (entry.isListCacheType()) {
return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue));
@@ -491,7 +491,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
} else {
entry.expireSeconds = 0;
entry.objectValue = value;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.lastAccessed = System.currentTimeMillis();
}
}
@@ -506,7 +506,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return true;
} else {
entry.expireSeconds = 0;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.lastAccessed = System.currentTimeMillis();
return false;
}
}
@@ -523,7 +523,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
} else {
entry.expireSeconds = 0;
entry.mapValue.put(field, value);
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.lastAccessed = System.currentTimeMillis();
}
}
@@ -539,7 +539,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return true;
} else {
entry.expireSeconds = 0;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.lastAccessed = System.currentTimeMillis();
return false;
}
}
@@ -648,14 +648,14 @@ public final class CacheMemorySource extends AbstractCacheSource {
return;
}
CacheEntry entry = container.get(key);
if (entry == null) {
if (entry == null || entry.isExpired()) {
entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null, null);
container.putIfAbsent(key, entry);
} else {
if (expireSeconds > 0) {
entry.expireSeconds = expireSeconds;
}
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.lastAccessed = System.currentTimeMillis();
entry.objectValue = value;
}
}
@@ -665,15 +665,13 @@ public final class CacheMemorySource extends AbstractCacheSource {
return false;
}
CacheEntry entry = container.get(key);
if (entry == null) {
if (entry == null || entry.isExpired()) {
entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null, null);
container.putIfAbsent(key, entry);
container.put(key, entry);
return true;
} else {
if (expireSeconds > 0) {
entry.expireSeconds = expireSeconds;
}
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.expireSeconds = expireSeconds > 0 ? expireSeconds : 0;
entry.lastAccessed = System.currentTimeMillis();
return false;
}
}
@@ -1297,6 +1295,52 @@ public final class CacheMemorySource extends AbstractCacheSource {
}, getExecutor());
}
@Override
public <T extends Number> CompletableFuture<T> zincrbyAsync(String key, CacheScoredValue value) {
return supplyAsync(() -> {
CacheEntry entry = container.get(key);
if (entry == null || entry.isExpired() || entry.csetValue == null) {
containerLock.lock();
try {
entry = container.get(key);
if (entry == null || entry.isExpired()) {
appendSetItem(CacheEntryType.SET_SORTED, key, List.of(new CacheScoredValue.NumberScoredValue(value.getScore().doubleValue(), value.getValue())));
}
} finally {
containerLock.unlock();
}
return (T) value.getScore();
}
entry.lock.lock();
try {
Set<CacheScoredValue.NumberScoredValue> sets = entry.csetValue;
CacheScoredValue.NumberScoredValue old = sets.stream().filter(v -> Objects.equals(v.getValue(), value.getValue())).findAny().orElse(null);
if (old == null) {
sets.add(new CacheScoredValue.NumberScoredValue(value.getScore().doubleValue(), value.getValue()));
return (T) value.getScore();
} else {
Number ic = value.getScore();
if (ic instanceof Integer) {
old.setScore((double) (old.getScore().intValue() + ic.intValue()));
} else if (ic instanceof Long) {
old.setScore((double) (old.getScore().longValue() + ic.longValue()));
} else if (ic instanceof Float) {
old.setScore((double) (old.getScore().floatValue() + ic.floatValue()));
} else if (ic instanceof Double) {
old.setScore(old.getScore().doubleValue() + ic.doubleValue());
} else if (ic instanceof AtomicInteger) {
((AtomicInteger) old.getScore()).addAndGet(((AtomicInteger) ic).get());
} else if (ic instanceof AtomicLong) {
((AtomicLong) old.getScore()).addAndGet(((AtomicLong) ic).get());
}
return (T) old.getScore();
}
} finally {
entry.lock.unlock();
}
}, getExecutor());
}
@Override
public CompletableFuture<Long> zcardAsync(String key) {
return supplyAsync(() -> {
@@ -1369,6 +1413,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
private <T extends Number> T formatScore(Class<T> scoreType, Number score) {
if (score == null) {
return null;
}
if (scoreType == int.class || scoreType == Integer.class) {
return (T) (Number) score.intValue();
} else if (scoreType == long.class || scoreType == Long.class) {
@@ -1393,7 +1440,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return null;
}
Set<CacheScoredValue> sets = entry.csetValue;
return (T) sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null);
return formatScore(scoreType, sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null));
}, getExecutor());
}
@@ -1498,13 +1545,13 @@ public final class CacheMemorySource extends AbstractCacheSource {
//<=0表示永久保存
int expireSeconds;
volatile int lastAccessed; //最后刷新时间
volatile long lastAccessed; //最后刷新时间
T objectValue;
ConcurrentHashMap<String, Serializable> mapValue;
final ReentrantLock mapLock = new ReentrantLock();
final ReentrantLock lock = new ReentrantLock();
Set<T> csetValue;
@@ -1515,11 +1562,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, Set<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue, mapValue);
this(cacheType, expireSeconds, System.currentTimeMillis(), key, objectValue, csetValue, listValue, mapValue);
}
@ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue", "mapValue"})
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, Set<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
public CacheEntry(CacheEntryType cacheType, int expireSeconds, long lastAccessed, String key, T objectValue, Set<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
this.cacheType = cacheType;
this.expireSeconds = expireSeconds;
this.lastAccessed = lastAccessed;
@@ -1552,7 +1599,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@ConvertColumn(ignore = true)
public boolean isExpired() {
return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000));
return expireSeconds > 0 && (lastAccessed + expireSeconds * 1000) < System.currentTimeMillis();
}
public CacheEntryType getCacheType() {
@@ -1563,7 +1610,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return expireSeconds;
}
public int getLastAccessed() {
public long getLastAccessed() {
return lastAccessed;
}

View File

@@ -59,7 +59,7 @@ public interface CacheScoredValue<S extends Number> extends Serializable, Compar
return value;
}
public void setScore(Long score) {
public void setScore(Number score) {
this.score = score;
}

View File

@@ -724,6 +724,22 @@ public interface CacheSource extends Resourcable {
zadd(key, CacheScoredValue.create(score, member));
}
default <T extends Number> T zincrby(String key, CacheScoredValue value) {
return (T) zincrbyAsync(key, value).join();
}
default int zincrby(String key, int score, String member) {
return zincrby(key, CacheScoredValue.create(score, member));
}
default long zincrby(String key, long score, String member) {
return zincrby(key, CacheScoredValue.create(score, member));
}
default double zincrby(String key, double score, String member) {
return zincrby(key, CacheScoredValue.create(score, member));
}
default long zrem(String key, String... members) {
return zremAsync(key, members).join();
}
@@ -1307,6 +1323,20 @@ public interface CacheSource extends Resourcable {
return zaddAsync(key, CacheScoredValue.create(score, member));
}
public <T extends Number> CompletableFuture<T> zincrbyAsync(String key, CacheScoredValue value);
default CompletableFuture<Integer> zincrbyAsync(String key, int score, String member) {
return zincrbyAsync(key, CacheScoredValue.create(score, member));
}
default CompletableFuture<Long> zincrbyAsync(String key, long score, String member) {
return zincrbyAsync(key, CacheScoredValue.create(score, member));
}
default CompletableFuture<Double> zincrbyAsync(String key, double score, String member) {
return zincrbyAsync(key, CacheScoredValue.create(score, member));
}
public CompletableFuture<Long> zremAsync(String key, String... members);
public <T extends Number> CompletableFuture<List<T>> zmscoreAsync(String key, Class<T> scoreType, String... members);