diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 0ae11e73d..acbc36943 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -9,7 +9,7 @@ import java.io.Serializable; import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.*; import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.*; @@ -118,7 +118,7 @@ public final class CacheMemorySource extends AbstractCacheSource { scheduler.scheduleWithFixedDelay(() -> { try { keys.clear(); - int now = (int) (System.currentTimeMillis() / 1000); + long now = System.currentTimeMillis(); container.forEach((k, x) -> { if (x.expireSeconds > 0 && (now > (x.lastAccessed + x.expireSeconds))) { keys.add(x.key); @@ -227,7 +227,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); if (!(val instanceof AtomicLong)) { - entry.mapLock.lock(); + entry.lock.lock(); try { if (!(val instanceof AtomicLong)) { if (val == null) { @@ -238,7 +238,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.mapValue.put(field, val); } } finally { - entry.mapLock.unlock(); + entry.lock.unlock(); } } return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num); @@ -265,7 +265,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); if (!(val instanceof AtomicLong)) { - entry.mapLock.lock(); + entry.lock.lock(); try { if (!(val instanceof AtomicLong)) { if (val == null) { @@ -276,7 +276,7 @@ public final class CacheMemorySource extends AbstractCacheSource { entry.mapValue.put(field, val); } } finally { - entry.mapLock.unlock(); + entry.lock.unlock(); } } return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num))); @@ -468,7 +468,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null || entry.isExpired()) { return null; } - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.lastAccessed = System.currentTimeMillis(); entry.expireSeconds = expireSeconds; if (entry.isListCacheType()) { return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue)); @@ -491,7 +491,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } else { entry.expireSeconds = 0; entry.objectValue = value; - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.lastAccessed = System.currentTimeMillis(); } } @@ -506,7 +506,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return true; } else { entry.expireSeconds = 0; - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.lastAccessed = System.currentTimeMillis(); return false; } } @@ -523,7 +523,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } else { entry.expireSeconds = 0; entry.mapValue.put(field, value); - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.lastAccessed = System.currentTimeMillis(); } } @@ -539,7 +539,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return true; } else { entry.expireSeconds = 0; - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.lastAccessed = System.currentTimeMillis(); return false; } } @@ -648,14 +648,14 @@ public final class CacheMemorySource extends AbstractCacheSource { return; } CacheEntry entry = container.get(key); - if (entry == null) { + if (entry == null || entry.isExpired()) { entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null, null); container.putIfAbsent(key, entry); } else { if (expireSeconds > 0) { entry.expireSeconds = expireSeconds; } - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.lastAccessed = System.currentTimeMillis(); entry.objectValue = value; } } @@ -665,15 +665,13 @@ public final class CacheMemorySource extends AbstractCacheSource { return false; } CacheEntry entry = container.get(key); - if (entry == null) { + if (entry == null || entry.isExpired()) { entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null, null); - container.putIfAbsent(key, entry); + container.put(key, entry); return true; } else { - if (expireSeconds > 0) { - entry.expireSeconds = expireSeconds; - } - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.expireSeconds = expireSeconds > 0 ? expireSeconds : 0; + entry.lastAccessed = System.currentTimeMillis(); return false; } } @@ -1297,6 +1295,52 @@ public final class CacheMemorySource extends AbstractCacheSource { }, getExecutor()); } + @Override + public CompletableFuture zincrbyAsync(String key, CacheScoredValue value) { + return supplyAsync(() -> { + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.csetValue == null) { + containerLock.lock(); + try { + entry = container.get(key); + if (entry == null || entry.isExpired()) { + appendSetItem(CacheEntryType.SET_SORTED, key, List.of(new CacheScoredValue.NumberScoredValue(value.getScore().doubleValue(), value.getValue()))); + } + } finally { + containerLock.unlock(); + } + return (T) value.getScore(); + } + entry.lock.lock(); + try { + Set sets = entry.csetValue; + CacheScoredValue.NumberScoredValue old = sets.stream().filter(v -> Objects.equals(v.getValue(), value.getValue())).findAny().orElse(null); + if (old == null) { + sets.add(new CacheScoredValue.NumberScoredValue(value.getScore().doubleValue(), value.getValue())); + return (T) value.getScore(); + } else { + Number ic = value.getScore(); + if (ic instanceof Integer) { + old.setScore((double) (old.getScore().intValue() + ic.intValue())); + } else if (ic instanceof Long) { + old.setScore((double) (old.getScore().longValue() + ic.longValue())); + } else if (ic instanceof Float) { + old.setScore((double) (old.getScore().floatValue() + ic.floatValue())); + } else if (ic instanceof Double) { + old.setScore(old.getScore().doubleValue() + ic.doubleValue()); + } else if (ic instanceof AtomicInteger) { + ((AtomicInteger) old.getScore()).addAndGet(((AtomicInteger) ic).get()); + } else if (ic instanceof AtomicLong) { + ((AtomicLong) old.getScore()).addAndGet(((AtomicLong) ic).get()); + } + return (T) old.getScore(); + } + } finally { + entry.lock.unlock(); + } + }, getExecutor()); + } + @Override public CompletableFuture zcardAsync(String key) { return supplyAsync(() -> { @@ -1369,6 +1413,9 @@ public final class CacheMemorySource extends AbstractCacheSource { } private T formatScore(Class scoreType, Number score) { + if (score == null) { + return null; + } if (scoreType == int.class || scoreType == Integer.class) { return (T) (Number) score.intValue(); } else if (scoreType == long.class || scoreType == Long.class) { @@ -1393,7 +1440,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return null; } Set sets = entry.csetValue; - return (T) sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null); + return formatScore(scoreType, sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null)); }, getExecutor()); } @@ -1498,13 +1545,13 @@ public final class CacheMemorySource extends AbstractCacheSource { //<=0表示永久保存 int expireSeconds; - volatile int lastAccessed; //最后刷新时间 + volatile long lastAccessed; //最后刷新时间 T objectValue; ConcurrentHashMap mapValue; - final ReentrantLock mapLock = new ReentrantLock(); + final ReentrantLock lock = new ReentrantLock(); Set csetValue; @@ -1515,11 +1562,11 @@ public final class CacheMemorySource extends AbstractCacheSource { } public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, Set csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { - this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue, mapValue); + this(cacheType, expireSeconds, System.currentTimeMillis(), key, objectValue, csetValue, listValue, mapValue); } @ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue", "mapValue"}) - public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, Set csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { + public CacheEntry(CacheEntryType cacheType, int expireSeconds, long lastAccessed, String key, T objectValue, Set csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { this.cacheType = cacheType; this.expireSeconds = expireSeconds; this.lastAccessed = lastAccessed; @@ -1552,7 +1599,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @ConvertColumn(ignore = true) public boolean isExpired() { - return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000)); + return expireSeconds > 0 && (lastAccessed + expireSeconds * 1000) < System.currentTimeMillis(); } public CacheEntryType getCacheType() { @@ -1563,7 +1610,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return expireSeconds; } - public int getLastAccessed() { + public long getLastAccessed() { return lastAccessed; } diff --git a/src/main/java/org/redkale/source/CacheScoredValue.java b/src/main/java/org/redkale/source/CacheScoredValue.java index 9dc1ea3e8..a55b81549 100644 --- a/src/main/java/org/redkale/source/CacheScoredValue.java +++ b/src/main/java/org/redkale/source/CacheScoredValue.java @@ -59,7 +59,7 @@ public interface CacheScoredValue extends Serializable, Compar return value; } - public void setScore(Long score) { + public void setScore(Number score) { this.score = score; } diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index aa32c07b5..a476b1583 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -724,6 +724,22 @@ public interface CacheSource extends Resourcable { zadd(key, CacheScoredValue.create(score, member)); } + default T zincrby(String key, CacheScoredValue value) { + return (T) zincrbyAsync(key, value).join(); + } + + default int zincrby(String key, int score, String member) { + return zincrby(key, CacheScoredValue.create(score, member)); + } + + default long zincrby(String key, long score, String member) { + return zincrby(key, CacheScoredValue.create(score, member)); + } + + default double zincrby(String key, double score, String member) { + return zincrby(key, CacheScoredValue.create(score, member)); + } + default long zrem(String key, String... members) { return zremAsync(key, members).join(); } @@ -1307,6 +1323,20 @@ public interface CacheSource extends Resourcable { return zaddAsync(key, CacheScoredValue.create(score, member)); } + public CompletableFuture zincrbyAsync(String key, CacheScoredValue value); + + default CompletableFuture zincrbyAsync(String key, int score, String member) { + return zincrbyAsync(key, CacheScoredValue.create(score, member)); + } + + default CompletableFuture zincrbyAsync(String key, long score, String member) { + return zincrbyAsync(key, CacheScoredValue.create(score, member)); + } + + default CompletableFuture zincrbyAsync(String key, double score, String member) { + return zincrbyAsync(key, CacheScoredValue.create(score, member)); + } + public CompletableFuture zremAsync(String key, String... members); public CompletableFuture> zmscoreAsync(String key, Class scoreType, String... members);