diff --git a/src/main/java/org/redkale/source/AbstractCacheSource.java b/src/main/java/org/redkale/source/AbstractCacheSource.java index e6afdbe4e..a54776a28 100644 --- a/src/main/java/org/redkale/source/AbstractCacheSource.java +++ b/src/main/java/org/redkale/source/AbstractCacheSource.java @@ -118,4 +118,8 @@ public abstract class AbstractCacheSource extends AbstractService implements Cac protected CompletableFuture supplyAsync(Supplier supplier, Executor executor) { return CompletableFuture.supplyAsync(supplier, executor); } + + protected CompletableFuture runAsync(Runnable runner, Executor executor) { + return CompletableFuture.runAsync(runner, executor); + } } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 544bf0690..80094972d 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -562,22 +562,22 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture hsetAsync(final String key, final String field, final Type type, final T value) { - return CompletableFuture.runAsync(() -> hset(key, field, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> hset(key, field, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture hsetAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return CompletableFuture.runAsync(() -> hset(key, field, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> hset(key, field, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture hsetStringAsync(final String key, final String field, final String value) { - return CompletableFuture.runAsync(() -> hsetString(key, field, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> hsetString(key, field, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture hsetLongAsync(final String key, final String field, final long value) { - return CompletableFuture.runAsync(() -> hsetLong(key, field, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> hsetLong(key, field, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -602,12 +602,12 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture hmsetAsync(final String key, final Serializable... values) { - return CompletableFuture.runAsync(() -> hmset(key, values), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> hmset(key, values), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture hmsetAsync(final String key, final Map map) { - return CompletableFuture.runAsync(() -> hmset(key, map), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> hmset(key, map), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -951,22 +951,22 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture msetAsync(final Object... keyVals) { - return CompletableFuture.runAsync(() -> mset(keyVals), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> mset(keyVals), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture msetAsync(final Map map) { - return CompletableFuture.runAsync(() -> mset(map), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> mset(map), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setAsync(String key, Type type, T value) { - return CompletableFuture.runAsync(() -> set(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> set(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setAsync(String key, Convert convert, Type type, T value) { - return CompletableFuture.runAsync(() -> set(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> set(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -981,17 +981,17 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture getSetAsync(String key, Type type, T value) { - return CompletableFuture.runAsync(() -> getSet(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> getSet(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture getSetAsync(String key, Convert convert, Type type, T value) { - return CompletableFuture.runAsync(() -> getSet(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> getSet(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setStringAsync(String key, String value) { - return CompletableFuture.runAsync(() -> setString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> setString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1001,12 +1001,12 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture getSetStringAsync(String key, String value) { - return CompletableFuture.runAsync(() -> getSetString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> getSetString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setLongAsync(String key, long value) { - return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> setLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1071,22 +1071,22 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setexAsync(String key, int expireSeconds, Type type, T value) { - return CompletableFuture.runAsync(() -> setex(key, expireSeconds, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> setex(key, expireSeconds, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { - return CompletableFuture.runAsync(() -> setex(key, expireSeconds, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> setex(key, expireSeconds, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setexStringAsync(String key, int expireSeconds, String value) { - return CompletableFuture.runAsync(() -> setexString(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> setexString(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setexLongAsync(String key, int expireSeconds, long value) { - return CompletableFuture.runAsync(() -> setexLong(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> setexLong(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1174,7 +1174,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture expireAsync(final String key, final int expireSeconds) { - return CompletableFuture.runAsync(() -> expire(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> expire(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1454,26 +1454,168 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyAsync(() -> sismemberLong(key, value), getExecutor()); } - protected void appendListItem(CacheEntryType cacheType, String key, Object value) { + protected void appendListItem(CacheEntryType cacheType, String key, Object... values) { + appendListItem(cacheType, true, key, values); + } + + protected void appendListItem(CacheEntryType cacheType, boolean tail, String key, Object... values) { if (key == null) { return; } CacheEntry entry = container.get(key); if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); + ConcurrentLinkedDeque list = new ConcurrentLinkedDeque(); entry = new CacheEntry(cacheType, key, null, null, list, null); CacheEntry old = container.putIfAbsent(key, entry); if (old != null) { list = old.listValue; } if (list != null) { - list.add(value); + if (tail) { + list.addAll(List.of(values)); + } else { + for (Object v : values) { + list.addFirst(v); + } + } } } else { - entry.listValue.add(value); + if (tail) { + entry.listValue.addAll(List.of(values)); + } else { + for (Object v : values) { + entry.listValue.addFirst(v); + } + } } } + @Override + public void lpush(final String key, final Type componentType, T... values) { + for (T value : values) { + appendListItem(CacheEntryType.OBJECT_LIST, false, key, value); + } + } + + @Override + public CompletableFuture lpushAsync(final String key, final Type componentType, final T... values) { + return runAsync(() -> lpush(key, componentType, values), getExecutor()).whenComplete(futureCompleteConsumer); + } + + @Override + public void lpushx(final String key, final Type componentType, T value) { + if (container.containsKey(key)) { + appendListItem(CacheEntryType.OBJECT_LIST, false, key, value); + } + } + + @Override + public CompletableFuture lpushxAsync(final String key, final Type componentType, final T value) { + return runAsync(() -> lpushx(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + + @Override + public T lpop(final String key, final Type componentType) { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isListCacheType() || entry.listValue == null) { + return null; + } + if (entry.listValue.isEmpty()) { + return null; + } + Object obj = entry.listValue.pollFirst(); + if (obj != null && componentType == long.class) { + obj = ((Number) obj).longValue(); + } + return (T) obj; + } + + @Override + public void ltrim(final String key, int start, int stop) { + if (key == null) { + return; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isListCacheType() || entry.listValue == null) { + return; + } + if (entry.listValue.isEmpty()) { + return; + } + Iterator it = entry.listValue.iterator(); + int index = -1; + int end = stop >= 0 ? stop : entry.listValue.size() + stop; + while (it.hasNext()) { + ++index; + if (index > end) { + break; + } else if (index >= start) { + it.remove(); + } + } + } + + @Override + public CompletableFuture ltrimAsync(final String key, int start, int stop) { + return runAsync(() -> ltrim(key, start, stop), getExecutor()).whenComplete(futureCompleteConsumer); + } + + @Override + public CompletableFuture lpopAsync(final String key, final Type componentType) { + return supplyAsync(() -> lpop(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer); + } + + @Override + public T rpoplpush(final String list1, final String list2, final Type componentType) { + T val = rpop(list1, componentType); + lpush(list2, componentType, val); + return val; + } + + @Override + public CompletableFuture rpoplpushAsync(final String list1, final String list2, final Type componentType) { + return supplyAsync(() -> rpoplpush(list1, list2, componentType), getExecutor()).whenComplete(futureCompleteConsumer); + } + + @Override + public T rpop(final String key, final Type componentType) { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isListCacheType() || entry.listValue == null) { + return null; + } + if (entry.listValue.isEmpty()) { + return null; + } + Object obj = entry.listValue.pollLast(); + if (obj != null && componentType == long.class) { + obj = ((Number) obj).longValue(); + } + return (T) obj; + } + + @Override + public CompletableFuture rpopAsync(final String key, final Type componentType) { + return supplyAsync(() -> rpop(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer); + } + + @Override + public void rpushx(String key, Type componentType, T value) { + if (container.containsKey(key)) { + appendListItem(CacheEntryType.OBJECT_LIST, key, value); + } + } + + @Override + public CompletableFuture rpushxAsync(final String key, final Type componentType, final T value) { + return runAsync(() -> rpushx(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override public void rpush(String key, Type componentType, T value) { appendListItem(CacheEntryType.OBJECT_LIST, key, value); @@ -1491,17 +1633,17 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture rpushAsync(final String key, final Type componentType, final T value) { - return CompletableFuture.runAsync(() -> rpush(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> rpush(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture rpushStringAsync(final String key, final String value) { - return CompletableFuture.runAsync(() -> rpushString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> rpushString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture rpushLongAsync(final String key, final long value) { - return CompletableFuture.runAsync(() -> rpushLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> rpushLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1694,17 +1836,17 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture saddAsync(final String key, final Type componentType, T value) { - return CompletableFuture.runAsync(() -> sadd(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> sadd(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture saddStringAsync(final String key, final String value) { - return CompletableFuture.runAsync(() -> saddString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> saddString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture saddLongAsync(final String key, final long value) { - return CompletableFuture.runAsync(() -> saddLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> saddLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1813,7 +1955,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setBytesAsync(final String key, byte[] value) { - return CompletableFuture.runAsync(() -> setBytes(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> setBytes(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1833,7 +1975,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setexBytesAsync(final String key, final int expireSeconds, byte[] value) { - return CompletableFuture.runAsync(() -> setexBytes(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> setexBytes(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1848,7 +1990,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture flushdbAsync() { - return CompletableFuture.runAsync(() -> flushdb(), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> flushdb(), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1858,7 +2000,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture flushallAsync() { - return CompletableFuture.runAsync(() -> flushall(), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> flushall(), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1975,18 +2117,18 @@ public final class CacheMemorySource extends AbstractCacheSource { CopyOnWriteArraySet csetValue; - ConcurrentLinkedQueue listValue; + ConcurrentLinkedDeque listValue; - public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue, ConcurrentHashMap mapValue) { + public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { this(cacheType, 0, key, objectValue, csetValue, listValue, mapValue); } - public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue, ConcurrentHashMap mapValue) { + public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue, mapValue); } @ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue", "mapValue"}) - public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue, ConcurrentHashMap mapValue) { + public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { this.cacheType = cacheType; this.expireSeconds = expireSeconds; this.lastAccessed = lastAccessed; @@ -2046,7 +2188,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return csetValue; } - public ConcurrentLinkedQueue getListValue() { + public ConcurrentLinkedDeque getListValue() { return listValue; } diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index ef2cd40c6..89a35ed62 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -281,6 +281,68 @@ public interface CacheSource extends Resourcable { return lrange(key, Long.class, 0, -1); } + public void ltrim(final String key, int start, int stop); + + public T lpop(final String key, final Type componentType); + + default String lpopString(final String key) { + return lpop(key, String.class); + } + + default Long lpopLong(final String key) { + return lpop(key, Long.class); + } + + public void lpush(final String key, final Type componentType, T... values); + + default void lpushString(final String key, String... values) { + lpush(key, String.class, values); + } + + default void lpushLong(final String key, Long... values) { + lpush(key, Long.class, values); + } + + public void lpushx(final String key, final Type componentType, T value); + + default void lpushxString(final String key, String value) { + lpushx(key, String.class, value); + } + + default void lpushxLong(final String key, Long value) { + lpushx(key, Long.class, value); + } + + public void rpushx(final String key, final Type componentType, T value); + + default void rpushxxString(final String key, String value) { + rpushx(key, String.class, value); + } + + default void rpushxLong(final String key, Long value) { + rpushx(key, Long.class, value); + } + + public T rpop(final String key, final Type componentType); + + default String rpopString(final String key) { + return rpop(key, String.class); + } + + default Long rpopLong(final String key) { + return rpop(key, Long.class); + } + + public T rpoplpush(final String list1, final String list2, final Type componentType); + + default String rpoplpushString(final String list1, final String list2) { + return rpoplpush(list1, list2, String.class); + } + + default Long rpoplpushLong(final String list1, final String list2) { + return rpoplpush(list1, list2, Long.class); + } + public int lrem(final String key, final Type componentType, final T value); default int lremString(final String key, final String value) { @@ -642,6 +704,68 @@ public interface CacheSource extends Resourcable { return lrangeAsync(key, Long.class, 0, -1); } + public CompletableFuture ltrimAsync(final String key, int start, int stop); + + public CompletableFuture lpopAsync(final String key, final Type componentType); + + default CompletableFuture lpopStringAsync(final String key) { + return lpopAsync(key, String.class); + } + + default CompletableFuture lpopLongAsync(final String key) { + return lpopAsync(key, Long.class); + } + + public CompletableFuture lpushAsync(final String key, final Type componentType, T... values); + + default CompletableFuture lpushStringAsync(final String key, String... values) { + return lpushAsync(key, String.class, values); + } + + default CompletableFuture lpushLongAsync(final String key, Long... values) { + return lpushAsync(key, Long.class, values); + } + + public CompletableFuture lpushxAsync(final String key, final Type componentType, T value); + + default CompletableFuture lpushxStringAsync(final String key, String value) { + return lpushxAsync(key, String.class, value); + } + + default CompletableFuture lpushxLongAsync(final String key, Long value) { + return lpushxAsync(key, Long.class, value); + } + + public CompletableFuture rpushxAsync(final String key, final Type componentType, T value); + + default CompletableFuture rpushxxStringAsync(final String key, String value) { + return rpushxAsync(key, String.class, value); + } + + default CompletableFuture rpushxLongAsync(final String key, Long value) { + return rpushxAsync(key, Long.class, value); + } + + public CompletableFuture rpopAsync(final String key, final Type componentType); + + default CompletableFuture rpopStringAsync(final String key) { + return rpopAsync(key, String.class); + } + + default CompletableFuture rpopLongAsync(final String key) { + return rpopAsync(key, Long.class); + } + + public CompletableFuture rpoplpushAsync(final String list1, final String list2, final Type componentType); + + default CompletableFuture rpoplpushStringAsync(final String list1, final String list2) { + return rpoplpushAsync(list1, list2, String.class); + } + + default CompletableFuture rpoplpushLongAsync(final String list1, final String list2) { + return rpoplpushAsync(list1, list2, Long.class); + } + public CompletableFuture lremAsync(final String key, final Type componentType, final T value); default CompletableFuture lremStringAsync(final String key, final String value) {