CacheSource增加incrbyFloat方法

This commit is contained in:
Redkale
2022-12-26 14:46:49 +08:00
parent ef6b90c2f6
commit 54ea96b820
2 changed files with 104 additions and 29 deletions

View File

@@ -67,6 +67,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return "memory";
}
@Override
@ResourceListener
public void onResourceChange(ResourceEvent[] events) {
}
@@ -170,11 +171,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public long hincr(final String key, String field) {
return hincr(key, field, 1);
return hincrby(key, field, 1);
}
@Override
public long hincr(final String key, String field, long num) {
public long hincrby(final String key, String field, long num) {
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
@@ -204,13 +205,43 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public long hdecr(final String key, String field) {
return hincr(key, field, -1);
public double hincrbyFloat(final String key, String field, double num) {
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
entry = container.get(key);
if (entry == null) {
ConcurrentHashMap<String, Serializable> map = new ConcurrentHashMap();
map.put(field, new AtomicLong());
entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map);
container.put(key, entry);
}
}
}
Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong());
if (!(val instanceof AtomicLong)) {
synchronized (entry.mapValue) {
if (!(val instanceof AtomicLong)) {
if (val == null) {
val = new AtomicLong();
} else {
val = new AtomicLong(((Number) val).longValue());
}
entry.mapValue.put(field, val);
}
}
}
return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num)));
}
@Override
public long hdecr(final String key, String field, long num) {
return hincr(key, field, -num);
public long hdecr(final String key, String field) {
return hincrby(key, field, -1);
}
@Override
public long hdecrby(final String key, String field, long num) {
return hincrby(key, field, -num);
}
@Override
@@ -358,7 +389,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null || entry.isExpired()) return null;
if (entry.isListCacheType()) return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue));
if (entry.isSetCacheType()) return (T) (entry.csetValue == null ? null : new HashSet(entry.csetValue));
return (T) entry.objectValue;
return entry.cacheType == CacheEntryType.DOUBLE ? (T) (Double) Double.longBitsToDouble(((AtomicLong) entry.objectValue).intValue()) : (T) entry.objectValue;
}
@Override
@@ -413,8 +444,13 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public CompletableFuture<Long> hincrAsync(final String key, String field, long num) {
return CompletableFuture.supplyAsync(() -> hincr(key, field, num), getExecutor());
public CompletableFuture<Long> hincrbyAsync(final String key, String field, long num) {
return CompletableFuture.supplyAsync(() -> hincrby(key, field, num), getExecutor());
}
@Override
public CompletableFuture<Double> hincrbyFloatAsync(final String key, String field, double num) {
return CompletableFuture.supplyAsync(() -> hincrbyFloat(key, field, num), getExecutor());
}
@Override
@@ -423,8 +459,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public CompletableFuture<Long> hdecrAsync(final String key, String field, long num) {
return CompletableFuture.supplyAsync(() -> hdecr(key, field, num), getExecutor());
public CompletableFuture<Long> hdecrbyAsync(final String key, String field, long num) {
return CompletableFuture.supplyAsync(() -> hdecrby(key, field, num), getExecutor());
}
@Override
@@ -902,7 +938,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public long incr(final String key) {
return incr(key, 1);
return incrby(key, 1);
}
@Override
@@ -911,7 +947,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public long incr(final String key, long num) {
public long incrby(final String key, long num) {
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
@@ -926,13 +962,34 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public CompletableFuture<Long> incrAsync(final String key, long num) {
return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
public double incrbyFloat(final String key, double num) {
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(CacheEntryType.DOUBLE, key, new AtomicLong(), null, null, null);
container.put(key, entry);
}
}
}
Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num));
return Double.longBitsToDouble(v.intValue());
}
@Override
public CompletableFuture<Long> incrbyAsync(final String key, long num) {
return CompletableFuture.supplyAsync(() -> incrby(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Double> incrbyFloatAsync(final String key, double num) {
return CompletableFuture.supplyAsync(() -> incrbyFloat(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public long decr(final String key) {
return incr(key, -1);
return incrby(key, -1);
}
@Override
@@ -941,13 +998,13 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public long decr(final String key, long num) {
return incr(key, -num);
public long decrby(final String key, long num) {
return incrby(key, -num);
}
@Override
public CompletableFuture<Long> decrAsync(final String key, long num) {
return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
public CompletableFuture<Long> decrbyAsync(final String key, long num) {
return CompletableFuture.supplyAsync(() -> decrby(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -1707,7 +1764,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
public static enum CacheEntryType {
LONG, STRING, OBJECT, BYTES, ATOMIC, MAP,
LONG, STRING, OBJECT, BYTES, ATOMIC, MAP, DOUBLE,
LONG_SET, STRING_SET, OBJECT_SET,
LONG_LIST, STRING_LIST, OBJECT_LIST;
}

View File

@@ -131,11 +131,13 @@ public interface CacheSource extends Resourcable {
public long incr(final String key);
public long incr(final String key, long num);
public long incrby(final String key, long num);
public long decr(final String key);
public long decr(final String key, long num);
public long decrby(final String key, long num);
public double incrbyFloat(final String key, double num);
//------------------------ hget ------------------------
public <T> T hget(final String key, final String field, final Type type);
@@ -175,11 +177,13 @@ public interface CacheSource extends Resourcable {
public long hincr(final String key, String field);
public long hincr(final String key, String field, long num);
public long hincrby(final String key, String field, long num);
public double hincrbyFloat(final String key, String field, double num);
public long hdecr(final String key, String field);
public long hdecr(final String key, String field, long num);
public long hdecrby(final String key, String field, long num);
public boolean hexists(final String key, String field);
@@ -412,11 +416,13 @@ public interface CacheSource extends Resourcable {
public CompletableFuture<Long> incrAsync(final String key);
public CompletableFuture<Long> incrAsync(final String key, long num);
public CompletableFuture<Long> incrbyAsync(final String key, long num);
public CompletableFuture<Long> decrAsync(final String key);
public CompletableFuture<Long> decrAsync(final String key, long num);
public CompletableFuture<Long> decrbyAsync(final String key, long num);
public CompletableFuture<Double> incrbyFloatAsync(final String key, double num);
//------------------------ hgetAsync ------------------------
public <T> CompletableFuture<T> hgetAsync(final String key, final String field, final Type type);
@@ -456,11 +462,13 @@ public interface CacheSource extends Resourcable {
public CompletableFuture<Long> hincrAsync(final String key, String field);
public CompletableFuture<Long> hincrAsync(final String key, String field, long num);
public CompletableFuture<Long> hincrbyAsync(final String key, String field, long num);
public CompletableFuture<Double> hincrbyFloatAsync(final String key, String field, double num);
public CompletableFuture<Long> hdecrAsync(final String key, String field);
public CompletableFuture<Long> hdecrAsync(final String key, String field, long num);
public CompletableFuture<Long> hdecrbyAsync(final String key, String field, long num);
public CompletableFuture<Boolean> hexistsAsync(final String key, String field);
@@ -992,4 +1000,14 @@ public interface CacheSource extends Resourcable {
default CompletableFuture<Map<String, Long>> getLongMapAsync(final String... keys) {
return mgetLongAsync(keys);
}
@Deprecated
default long incr(final String key, long num) {
return incrby(key, num);
}
@Deprecated
default long decr(final String key, long num) {
return decrby(key, num);
}
}