diff --git a/src/main/java/org/redkale/caching/CacheFactory.java b/src/main/java/org/redkale/caching/CacheFactory.java index 6ca060aea..fe068e2d2 100644 --- a/src/main/java/org/redkale/caching/CacheFactory.java +++ b/src/main/java/org/redkale/caching/CacheFactory.java @@ -3,6 +3,7 @@ */ package org.redkale.caching; +import java.lang.reflect.Type; import org.redkale.source.CacheMemorySource; import org.redkale.source.CacheSource; @@ -24,4 +25,16 @@ public class CacheFactory { public static CacheFactory create(CacheSource remoteSource) { return new CacheFactory(remoteSource); } + + protected long hdelLocal(String map, String key) { + return localSource.hdel(map, key); + } + + protected void hsetLocal(final String map, final String key, final Type type, final T value) { + localSource.hset(map, key, type, value); + } + + protected T hgetLocal(final String map, final String key, final Type type) { + return localSource.hget(map, key, type); + } } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index fdcf20514..71a599663 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -288,85 +288,112 @@ public final class CacheMemorySource extends AbstractCacheSource { } //------------------------ 字符串 String ------------------------ + @Override + public void mset(Serializable... keyVals) { + if (keyVals.length % 2 != 0) { + throw new SourceException("key value must be paired"); + } + for (int i = 0; i < keyVals.length; i += 2) { + String key = keyVals[i].toString(); + Object val = keyVals[i + 1]; + set0(key.toString(), 0, null, val); + } + } + @Override public CompletableFuture msetAsync(Serializable... keyVals) { - return runFuture(() -> { - if (keyVals.length % 2 != 0) { - throw new SourceException("key value must be paired"); - } - for (int i = 0; i < keyVals.length; i += 2) { - String key = keyVals[i].toString(); - Object val = keyVals[i + 1]; - set0(key.toString(), 0, null, val); - } - }); + return runFuture(() -> mset(keyVals)); + } + + @Override + public void mset(Map map) { + map.forEach((key, val) -> set0(key.toString(), 0, null, val)); } @Override public CompletableFuture msetAsync(Map map) { - return runFuture(() -> map.forEach((key, val) -> set0(key.toString(), 0, null, val))); + return runFuture(() -> mset(map)); + } + + @Override + public void set(String key, Convert convert, Type type, T value) { + set0(key, 0, type, value); } @Override public CompletableFuture setAsync(String key, Convert convert, Type type, T value) { - return runFuture(() -> set0(key, 0, type, value)); + return runFuture(() -> set(key, convert, type, value)); + } + + @Override + public boolean setnx(String key, Convert convert, Type type, T value) { + return setnxex(key, 0, convert, type, value); } @Override public CompletableFuture setnxAsync(String key, Convert convert, Type type, T value) { - return setnxexAsync(key, 0, convert, type, value); + return supplyFuture(() -> setnx(key, convert, type, value)); + } + + @Override + public boolean setnxex(String key, int expireSeconds, Convert convert, Type type, T value) { + CacheEntry entry = find(key); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.OBJECT); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.OBJECT, key); + container.put(key, entry); + entry.objectValue = value; + entry.expireSeconds(expireSeconds); + entry.lastAccessed = System.currentTimeMillis(); + return true; + } + return false; + } finally { + containerLock.unlock(); + } + } + return false; } @Override public CompletableFuture setnxexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { - return supplyFuture(() -> { - CacheEntry entry = find(key); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.OBJECT); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.OBJECT, key); - container.put(key, entry); - entry.objectValue = value; - entry.expireSeconds(expireSeconds); - entry.lastAccessed = System.currentTimeMillis(); - return true; - } - return false; - } finally { - containerLock.unlock(); - } - } - return false; - }); + return supplyFuture(() -> setnxex(key, expireSeconds, convert, type, value)); + } + + @Override + public T getSet(String key, Convert convert, Type type, T value) { + CacheEntry entry = find(key, CacheEntryType.OBJECT); + T old = entry == null ? null : (T) entry.objectValue; + set0(key, 0, type, value); + return convertValue(type, old); } @Override public CompletableFuture getSetAsync(String key, Convert convert, Type type, T value) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.OBJECT); - T old = entry == null ? null : (T) entry.objectValue; - set0(key, 0, type, value); - return convertValue(type, old); - }); + return supplyFuture(() -> getSet(key, convert, type, value)); + } + + @Override + public T getDel(String key, Type type) { + CacheEntry entry = find(key, CacheEntryType.OBJECT); + if (entry == null) { + return null; + } + containerLock.lock(); + try { + container.remove(key); + } finally { + containerLock.unlock(); + } + return convertValue(type, entry.objectValue); } @Override public CompletableFuture getDelAsync(String key, Type type) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.OBJECT); - if (entry == null) { - return null; - } - containerLock.lock(); - try { - container.remove(key); - } finally { - containerLock.unlock(); - } - return convertValue(type, entry.objectValue); - }); + return supplyFuture(() -> getDel(key, type)); } private void set0(String key, int expireSeconds, Type type, Object value) { @@ -393,216 +420,275 @@ public final class CacheMemorySource extends AbstractCacheSource { } } + @Override + public void setex(String key, int expireSeconds, Convert convert, Type type, T value) { + set0(key, expireSeconds, type, value); + } + @Override public CompletableFuture setexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { - return runFuture(() -> set0(key, expireSeconds, type, value)); + return runFuture(() -> setex(key, expireSeconds, convert, type, value)); + } + + @Override + public void expire(String key, int expireSeconds) { + CacheEntry entry = find(key); + if (entry == null) { + return; + } + entry.lock(); + try { + entry.expireSeconds(expireSeconds); + } finally { + entry.unlock(); + } } @Override public CompletableFuture expireAsync(String key, int expireSeconds) { - return runFuture(() -> { - CacheEntry entry = find(key); - if (entry == null) { - return; + return runFuture(() -> expire(key, expireSeconds)); + } + + @Override + public boolean persist(final String key) { + CacheEntry entry = find(key); + if (entry == null) { + return false; + } + entry.lock(); + try { + if (entry.expireMills > 0) { + entry.expireMills = 0; + return true; + } else { + return false; } - entry.lock(); - try { - entry.expireSeconds(expireSeconds); - } finally { - entry.unlock(); - } - }); + } finally { + entry.unlock(); + } } @Override public CompletableFuture persistAsync(final String key) { - return supplyFuture(() -> { - CacheEntry entry = find(key); - if (entry == null) { + return supplyFuture(() -> persist(key)); + } + + @Override + public boolean rename(String oldKey, String newKey) { + if (oldKey == null || newKey == null) { + return false; + } + containerLock.lock(); + try { + CacheEntry oldEntry = find(oldKey); + if (oldEntry == null) { return false; } - entry.lock(); - try { - if (entry.expireMills > 0) { - entry.expireMills = 0; - return true; - } else { - return false; - } - } finally { - entry.unlock(); - } - }); + oldEntry.key = newKey; + container.put(newKey, oldEntry); + container.remove(oldKey); + return true; + } finally { + containerLock.unlock(); + } } @Override public CompletableFuture renameAsync(String oldKey, String newKey) { - return supplyFuture(() -> { - if (oldKey == null || newKey == null) { + return supplyFuture(() -> rename(oldKey, newKey)); + } + + @Override + public boolean renamenx(String oldKey, String newKey) { + if (oldKey == null || newKey == null) { + return false; + } + containerLock.lock(); + try { + CacheEntry newEntry = find(newKey); + if (newEntry != null) { return false; } - containerLock.lock(); - try { - CacheEntry oldEntry = find(oldKey); - if (oldEntry == null) { - return false; - } - oldEntry.key = newKey; - container.put(newKey, oldEntry); - container.remove(oldKey); - return true; - } finally { - containerLock.unlock(); + CacheEntry oldEntry = find(oldKey); + if (oldEntry == null) { + return false; } - }); + oldEntry.key = newKey; + container.put(newKey, oldEntry); + container.remove(oldKey); + return true; + } finally { + containerLock.unlock(); + } } @Override public CompletableFuture renamenxAsync(String oldKey, String newKey) { - return supplyFuture(() -> { - if (oldKey == null || newKey == null) { - return false; + return supplyFuture(() -> renamenx(oldKey, newKey)); + } + + @Override + public long del(final String... keys) { + if (keys == null) { + return 0L; + } + long count = 0; + containerLock.lock(); + try { + for (String key : keys) { + count += container.remove(key) == null ? 0 : 1; } - containerLock.lock(); - try { - CacheEntry newEntry = find(newKey); - if (newEntry != null) { - return false; - } - CacheEntry oldEntry = find(oldKey); - if (oldEntry == null) { - return false; - } - oldEntry.key = newKey; - container.put(newKey, oldEntry); - container.remove(oldKey); - return true; - } finally { - containerLock.unlock(); - } - }); + } finally { + containerLock.unlock(); + } + return count; } @Override public CompletableFuture delAsync(final String... keys) { - return supplyFuture(() -> { - if (keys == null) { - return 0L; - } - long count = 0; - containerLock.lock(); - try { - for (String key : keys) { - count += container.remove(key) == null ? 0 : 1; - } - } finally { - containerLock.unlock(); - } - return count; - }); + return supplyFuture(() -> del(keys)); + } + + @Override + public long incr(final String key) { + return incrby(key, 1); } @Override public CompletableFuture incrAsync(final String key) { - return incrbyAsync(key, 1); + return supplyFuture(() -> incr(key)); + } + + @Override + public long incrby(final String key, long num) { + CacheEntry entry = find(key); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.ATOMIC); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.ATOMIC, key); + entry.objectValue = new AtomicLong(); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + if (entry.cacheType != CacheEntryType.ATOMIC) { + entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); + entry.cacheType = CacheEntryType.ATOMIC; + } + return ((AtomicLong) entry.objectValue).addAndGet(num); + } finally { + entry.unlock(); + } } @Override public CompletableFuture incrbyAsync(final String key, long num) { - return supplyFuture(() -> { - CacheEntry entry = find(key); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.ATOMIC); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.ATOMIC, key); - entry.objectValue = new AtomicLong(); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); + return supplyFuture(() -> incrby(key, num)); + } + + @Override + public double incrbyFloat(final String key, double num) { + CacheEntry entry = find(key, CacheEntryType.DOUBLE); + if (entry == null) { + containerLock.lock(); try { - if (entry.cacheType != CacheEntryType.ATOMIC) { - entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); - entry.cacheType = CacheEntryType.ATOMIC; + entry = find(key, CacheEntryType.DOUBLE); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.DOUBLE, key); + entry.objectValue = new AtomicLong(); + container.put(key, entry); } - return ((AtomicLong) entry.objectValue).addAndGet(num); } finally { - entry.unlock(); + containerLock.unlock(); } - }); + } + entry.lock(); + try { + if (entry.cacheType != CacheEntryType.DOUBLE) { + entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); + entry.cacheType = CacheEntryType.DOUBLE; + } + Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num)); + return Double.longBitsToDouble(v.longValue()); + } finally { + entry.unlock(); + } } @Override public CompletableFuture incrbyFloatAsync(final String key, double num) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.DOUBLE); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.DOUBLE); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.DOUBLE, key); - entry.objectValue = new AtomicLong(); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); - try { - if (entry.cacheType != CacheEntryType.DOUBLE) { - entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); - entry.cacheType = CacheEntryType.DOUBLE; - } - Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num)); - return Double.longBitsToDouble(v.longValue()); - } finally { - entry.unlock(); - } - }); + return supplyFuture(() -> incrbyFloat(key, num)); + } + + @Override + public long decr(final String key) { + return incrby(key, -1); } @Override public CompletableFuture decrAsync(final String key) { - return incrbyAsync(key, -1); + return supplyFuture(() -> decr(key)); + } + + @Override + public long decrby(final String key, long num) { + return incrby(key, -num); } @Override public CompletableFuture decrbyAsync(final String key, long num) { - return incrbyAsync(key, -num); + return supplyFuture(() -> decrby(key, num)); + } + + @Override + public List mget(final Type componentType, final String... keys) { + List list = new ArrayList<>(); + for (String key : keys) { + list.add(get0(key, 0, componentType)); + } + return list; } @Override public CompletableFuture> mgetAsync(final Type componentType, final String... keys) { - return supplyFuture(() -> { - List list = new ArrayList<>(); - for (String key : keys) { - list.add(get0(key, 0, componentType)); - } - return list; - }); + return supplyFuture(() -> mget(componentType, keys)); } //----------- hxxx -------------- + @Override + public boolean exists(String key) { + return find(key) != null; + } + @Override public CompletableFuture existsAsync(String key) { - return supplyFuture(() -> find(key) != null); + return supplyFuture(() -> exists(key)); + } + + @Override + public T get(final String key, final Type type) { + return get0(key, 0, type); } @Override public CompletableFuture getAsync(final String key, final Type type) { - return supplyFuture(() -> get0(key, 0, type)); + return supplyFuture(() -> get(key, type)); + } + + @Override + public T getex(final String key, final int expireSeconds, final Type type) { + return get0(key, expireSeconds, type); } @Override public CompletableFuture getexAsync(final String key, final int expireSeconds, final Type type) { - return supplyFuture(() -> get0(key, expireSeconds, type)); + return supplyFuture(() -> getex(key, expireSeconds, type)); } private T get0(final String key, final int expireSeconds, final Type type) { @@ -638,55 +724,70 @@ public final class CacheMemorySource extends AbstractCacheSource { //------------------------ 哈希表 Hash ------------------------ @Override - public CompletableFuture hdelAsync(final String key, String... fields) { - return supplyFuture(() -> { - long count = 0; - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - return 0L; - } - Map map = entry.mapValue; - entry.lock(); - try { - for (String field : fields) { - if (map.remove(field) != null) { - count++; - } + public long hdel(final String key, String... fields) { + long count = 0; + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return 0L; + } + Map map = entry.mapValue; + entry.lock(); + try { + for (String field : fields) { + if (map.remove(field) != null) { + count++; } - } finally { - entry.unlock(); } - return count; - }); + } finally { + entry.unlock(); + } + return count; + } + + @Override + public CompletableFuture hdelAsync(final String key, String... fields) { + return supplyFuture(() -> hdel(key, fields)); + } + + @Override + public List hkeys(final String key) { + List list = new ArrayList<>(); + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return list; + } + list.addAll(entry.mapValue.keySet()); + return list; } @Override public CompletableFuture> hkeysAsync(final String key) { - return supplyFuture(() -> { - List list = new ArrayList<>(); - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - return list; - } - list.addAll(entry.mapValue.keySet()); - return list; - }); + return supplyFuture(() -> hkeys(key)); + } + + @Override + public long hlen(final String key) { + CacheEntry entry = find(key, CacheEntryType.MAP); + return entry == null ? 0L : (long) entry.mapValue.keySet().size(); } @Override public CompletableFuture hlenAsync(final String key) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.MAP); - return entry == null ? 0L : (long) entry.mapValue.keySet().size(); - }); + return supplyFuture(() -> hlen(key)); + } + + @Override + public long hincr(final String key, String field) { + return hincrby(key, field, 1); } @Override public CompletableFuture hincrAsync(final String key, String field) { - return hincrbyAsync(key, field, 1); + return supplyFuture(() -> hincr(key, field)); } - private long hincrby0(final String key, String field, long num) { + @Override + public long hincrby(final String key, String field, long num) { CacheEntry entry = find(key, CacheEntryType.MAP); if (entry == null) { containerLock.lock(); @@ -716,173 +817,227 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture hincrbyAsync(final String key, String field, long num) { - return supplyFuture(() -> hincrby0(key, field, num)); + return supplyFuture(() -> hincrby(key, field, num)); + } + + @Override + public double hincrbyFloat(final String key, String field, double num) { + return Double.longBitsToDouble(hincrby(key, field, Double.doubleToLongBits(num))); } @Override public CompletableFuture hincrbyFloatAsync(final String key, String field, double num) { - return supplyFuture(() -> Double.longBitsToDouble(hincrby0(key, field, Double.doubleToLongBits(num)))); + return supplyFuture(() -> hincrbyFloat(key, field, num)); + } + + @Override + public long hdecr(final String key, String field) { + return hincrby(key, field, -1); } @Override public CompletableFuture hdecrAsync(final String key, String field) { - return hincrbyAsync(key, field, -1); + return supplyFuture(() -> hdecr(key, field)); + } + + @Override + public long hdecrby(final String key, String field, long num) { + return hincrby(key, field, -num); } @Override public CompletableFuture hdecrbyAsync(final String key, String field, long num) { - return hincrbyAsync(key, field, -num); + return supplyFuture(() -> hdecrby(key, field, num)); + } + + @Override + public boolean hexists(final String key, String field) { + CacheEntry entry = find(key, CacheEntryType.MAP); + return entry != null && entry.mapValue.contains(field); } @Override public CompletableFuture hexistsAsync(final String key, String field) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.MAP); - return entry != null && entry.mapValue.contains(field); - }); + return supplyFuture(() -> hexists(key, field)); + } + + //需要给CacheFactory使用 + @Override + public void hset(final String key, final String field, final Convert convert, final Type type, final T value) { + hset0(key, field, type, value); } @Override public CompletableFuture hsetAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return runFuture(() -> hset0(key, field, type, value)); + return runFuture(() -> hset(key, field, type, value)); + } + + @Override + public boolean hsetnx(final String key, final String field, final Convert convert, final Type type, final T value) { + if (value == null) { + return false; + } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.MAP); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.MAP, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + boolean rs = entry.mapValue.putIfAbsent(field, convertValue(type, value)) == null; + entry.lastAccessed = System.currentTimeMillis(); + return rs; + } finally { + entry.unlock(); + } } @Override public CompletableFuture hsetnxAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return supplyFuture(() -> { - if (value == null) { - return false; - } - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.MAP); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.MAP, key); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); - try { - boolean rs = entry.mapValue.putIfAbsent(field, convertValue(type, value)) == null; - entry.lastAccessed = System.currentTimeMillis(); - return rs; - } finally { - entry.unlock(); - } - }); + return supplyFuture(() -> hsetnx(key, field, convert, type, value)); + } + + @Override + public void hmset(final String key, final Serializable... values) { + for (int i = 0; i < values.length; i += 2) { + hset0(key, (String) values[i], null, values[i + 1]); + } } @Override public CompletableFuture hmsetAsync(final String key, final Serializable... values) { - return runFuture(() -> { - for (int i = 0; i < values.length; i += 2) { - hset0(key, (String) values[i], null, values[i + 1]); - } - }); + return runFuture(() -> hmset(key, values)); + } + + @Override + public void hmset(final String key, final Map map) { + map.forEach((k, v) -> hset0(key, (String) k, null, v)); } @Override public CompletableFuture hmsetAsync(final String key, final Map map) { - return runFuture(() -> map.forEach((k, v) -> hset0(key, (String) k, null, v))); + return runFuture(() -> hmset(key, map)); + } + + @Override + public List hmget(final String key, final Type type, final String... fields) { + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return null; + } + Map map = entry.mapValue; + List rs = new ArrayList<>(fields.length); + for (String field : fields) { + rs.add(convertValue(type, map.get(field))); + } + return rs; } @Override public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - return null; - } - Map map = entry.mapValue; - List rs = new ArrayList<>(fields.length); - for (String field : fields) { - rs.add(convertValue(type, map.get(field))); - } - return rs; - }); + return supplyFuture(() -> hmget(key, type, fields)); + } + + @Override + public Map hgetall(final String key, final Type type) { + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return new LinkedHashMap(); + } else { + Map map = new LinkedHashMap(); + entry.mapValue.forEach((k, v) -> map.put(k, convertValue(type, v))); + return map; + } } @Override public CompletableFuture> hgetallAsync(final String key, final Type type) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - return new LinkedHashMap(); - } else { - Map map = new LinkedHashMap(); - entry.mapValue.forEach((k, v) -> map.put(k, convertValue(type, v))); - return map; - } - }); + return supplyFuture(() -> hgetall(key, type)); + } + + @Override + public List hvals(final String key, final Type type) { + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return new ArrayList(); + } else { + Stream stream = entry.mapValue.values().stream().map(v -> convertValue(type, v)); + return new ArrayList(stream.collect(Collectors.toList())); + } } @Override public CompletableFuture> hvalsAsync(final String key, final Type type) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - return new ArrayList(); - } else { - Stream stream = entry.mapValue.values().stream().map(v -> convertValue(type, v)); - return new ArrayList(stream.collect(Collectors.toList())); - } - }); + return supplyFuture(() -> hvals(key, type)); + } + + @Override + public Map hscan(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { + if (key == null) { + return new HashMap(); + } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return new HashMap(); + } + if (Utility.isEmpty(pattern)) { + return new LinkedHashMap(entry.mapValue); + } else { + Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); + Set> set = entry.mapValue.entrySet(); + return set.stream().filter(en -> regx.test(en.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, en -> convertValue(type, en.getValue()))); + } } @Override public CompletableFuture> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { - return supplyFuture(() -> { - if (key == null) { - return new HashMap(); - } - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - return new HashMap(); - } - if (Utility.isEmpty(pattern)) { - return new LinkedHashMap(entry.mapValue); - } else { - Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); - Set> set = entry.mapValue.entrySet(); - return set.stream().filter(en -> regx.test(en.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, en -> convertValue(type, en.getValue()))); - } - }); + return supplyFuture(() -> hscan(key, type, cursor, limit, pattern)); + } + + //需要给CacheFactory使用 + @Override + public T hget(final String key, final String field, final Type type) { + if (key == null || field == null) { + return null; + } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return null; + } + Object obj = entry.mapValue.get(field); + return obj == null ? null : convertValue(type, obj); } @Override public CompletableFuture hgetAsync(final String key, final String field, final Type type) { - return supplyFuture(() -> { - if (key == null || field == null) { - return null; - } - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - return null; - } - Object obj = entry.mapValue.get(field); - return obj == null ? null : convertValue(type, obj); - }); + return supplyFuture(() -> hget(key, field, type)); + } + + @Override + public long hstrlen(final String key, final String field) { + if (key == null || field == null) { + return 0L; + } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return 0L; + } + Object obj = entry.mapValue.get(field); + return obj == null ? 0L : (long) obj.toString().length(); } @Override public CompletableFuture hstrlenAsync(final String key, final String field) { - return supplyFuture(() -> { - if (key == null || field == null) { - return 0L; - } - CacheEntry entry = find(key, CacheEntryType.MAP); - if (entry == null) { - return 0L; - } - Object obj = entry.mapValue.get(field); - return obj == null ? 0L : (long) obj.toString().length(); - }); + return supplyFuture(() -> hstrlen(key, field)); } private void hset0(String key, String field, Type type, Object value) { @@ -912,369 +1067,430 @@ public final class CacheMemorySource extends AbstractCacheSource { } //------------------------ 列表 List ------------------------ + @Override + public List lrange(final String key, final Type componentType, int start, int stop) { + return get(key, componentType); + } + @Override public CompletableFuture> lrangeAsync(final String key, final Type componentType, int start, int stop) { - return getAsync(key, componentType); + return supplyFuture(() -> lrange(key, componentType, start, stop)); + } + + @Override + public Map> lranges(final Type componentType, final String... keys) { + Map> map = new HashMap<>(); + for (String key : keys) { + List s = (List) get(key, componentType); + if (s != null) { + map.put(key, s); + } + } + return map; } @Override public CompletableFuture>> lrangesAsync(final Type componentType, final String... keys) { - return supplyFuture(() -> { - Map> map = new HashMap<>(); - for (String key : keys) { - List s = (List) get(key, componentType); - if (s != null) { - map.put(key, s); - } - } - return map; - }); + return supplyFuture(() -> lranges(componentType, keys)); + } + + @Override + public long llen(final String key) { + CacheEntry entry = find(key, CacheEntryType.LIST); + return entry == null ? 0L : (long) entry.listValue.size(); } @Override public CompletableFuture llenAsync(final String key) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - return entry == null ? 0L : (long) entry.listValue.size(); - }); + return supplyFuture(() -> llen(key)); + } + + @Override + public T lindex(String key, Type componentType, int index) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return null; + } + List list = new ArrayList(entry.listValue); + int pos = index >= 0 ? index : list.size() + index; + return pos >= list.size() ? null : convertValue(componentType, list.get(pos)); } @Override public CompletableFuture lindexAsync(String key, Type componentType, int index) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - return null; - } - List list = new ArrayList(entry.listValue); - int pos = index >= 0 ? index : list.size() + index; - return pos >= list.size() ? null : convertValue(componentType, list.get(pos)); - }); + return supplyFuture(() -> lindex(key, componentType, index)); + } + + @Override + public long linsertBefore(String key, Type componentType, T pivot, T value) { + return linsert(key, componentType, true, pivot, value); } @Override public CompletableFuture linsertBeforeAsync(String key, Type componentType, T pivot, T value) { - return linsertAsync(key, componentType, true, pivot, value); + return supplyFuture(() -> linsertBefore(key, componentType, pivot, value)); + } + + @Override + public long linsertAfter(String key, Type componentType, T pivot, T value) { + return linsert(key, componentType, false, pivot, value); } @Override public CompletableFuture linsertAfterAsync(String key, Type componentType, T pivot, T value) { - return linsertAsync(key, componentType, false, pivot, value); + return supplyFuture(() -> linsertAfter(key, componentType, pivot, value)); } - protected CompletableFuture linsertAsync(String key, Type componentType, boolean before, T pivot, T value) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - return 0L; + protected long linsert(String key, Type componentType, boolean before, T pivot, T value) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return 0L; + } + entry.lock(); + try { + List list = new ArrayList<>(entry.listValue); + int pos = list.indexOf(pivot); + if (pos < 0) { + return -1L; } - entry.lock(); - try { - List list = new ArrayList<>(entry.listValue); - int pos = list.indexOf(pivot); - if (pos < 0) { - return -1L; - } - List newList = new ArrayList<>(); - if (before) { - if (pos == 0) { - newList.add(value); - newList.addAll(list); - } else { - newList.addAll(list.subList(0, pos)); - newList.add(value); - newList.addAll(list.subList(pos, list.size())); - } + List newList = new ArrayList<>(); + if (before) { + if (pos == 0) { + newList.add(value); + newList.addAll(list); } else { - if (pos == list.size() - 1) { - newList.addAll(list); - newList.add(value); - } else { - newList.addAll(list.subList(0, pos + 1)); - newList.add(value); - newList.addAll(list.subList(pos + 1, list.size())); - } + newList.addAll(list.subList(0, pos)); + newList.add(value); + newList.addAll(list.subList(pos, list.size())); + } + } else { + if (pos == list.size() - 1) { + newList.addAll(list); + newList.add(value); + } else { + newList.addAll(list.subList(0, pos + 1)); + newList.add(value); + newList.addAll(list.subList(pos + 1, list.size())); } - entry.listValue.clear(); - entry.listValue.addAll(newList); - return 1L; - } finally { - entry.unlock(); } - }); + entry.listValue.clear(); + entry.listValue.addAll(newList); + return 1L; + } finally { + entry.unlock(); + } + } + + @Override + public void lpush(final String key, final Type componentType, T... values) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.LIST); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.LIST, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + for (T val : values) { + entry.listValue.addFirst(val); + } + } finally { + entry.unlock(); + } } @Override public CompletableFuture lpushAsync(final String key, final Type componentType, T... values) { - return runFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.LIST); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.LIST, key); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } + return runFuture(() -> lpush(key, componentType, values)); + } + + @Override + public void lpushx(final String key, final Type componentType, T... values) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return; + } + entry.lock(); + try { + ConcurrentLinkedDeque list = entry.listValue; + for (T val : values) { + list.addFirst(val); } - entry.lock(); - try { - for (T val : values) { - entry.listValue.addFirst(val); - } - } finally { - entry.unlock(); - } - }); + } finally { + entry.unlock(); + } } @Override public CompletableFuture lpushxAsync(final String key, final Type componentType, T... values) { - return runFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - return; - } - entry.lock(); - try { - ConcurrentLinkedDeque list = entry.listValue; - for (T val : values) { - list.addFirst(val); - } - } finally { - entry.unlock(); - } - }); + return runFuture(() -> lpushx(key, componentType, values)); + } + + @Override + public T lpop(final String key, final Type componentType) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return null; + } + entry.lock(); + try { + return convertValue(componentType, entry.listValue.pollFirst()); + } finally { + entry.unlock(); + } } @Override public CompletableFuture lpopAsync(final String key, final Type componentType) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - return null; + return supplyFuture(() -> lpop(key, componentType)); + } + + @Override + public void ltrim(final String key, int start, int stop) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return; + } + entry.lock(); + try { + ConcurrentLinkedDeque list = entry.listValue; + Iterator it = list.iterator(); + int index = -1; + int end = stop >= 0 ? stop : list.size() + stop; + while (it.hasNext()) { + ++index; + if (index > end) { + break; + } else if (index >= start) { + it.remove(); + } } - entry.lock(); - try { - return convertValue(componentType, entry.listValue.pollFirst()); - } finally { - entry.unlock(); - } - }); + } finally { + entry.unlock(); + } } @Override public CompletableFuture ltrimAsync(final String key, int start, int stop) { - return runFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - return; - } - entry.lock(); - try { - ConcurrentLinkedDeque list = entry.listValue; - Iterator it = list.iterator(); - int index = -1; - int end = stop >= 0 ? stop : list.size() + stop; - while (it.hasNext()) { - ++index; - if (index > end) { - break; - } else if (index >= start) { - it.remove(); - } - } - } finally { - entry.unlock(); - } - }); + return runFuture(() -> ltrim(key, start, stop)); + } + + @Override + public T rpoplpush(final String key, final String key2, final Type componentType) { + T val = rpop(key, componentType); + lpush(key2, componentType, val); + return val; } @Override public CompletableFuture rpoplpushAsync(final String key, final String key2, final Type componentType) { - return supplyFuture(() -> { - T val = rpop(key, componentType); - lpush(key2, componentType, val); - return val; - }); + return supplyFuture(() -> rpoplpush(key, key2, componentType)); + } + + @Override + public T rpop(final String key, final Type componentType) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return null; + } + entry.lock(); + try { + return convertValue(componentType, entry.listValue.pollLast()); + } finally { + entry.unlock(); + } } @Override public CompletableFuture rpopAsync(final String key, final Type componentType) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - return null; - } - entry.lock(); - try { - return convertValue(componentType, entry.listValue.pollLast()); - } finally { - entry.unlock(); - } - }); + return supplyFuture(() -> rpop(key, componentType)); + } + + @Override + public void rpushx(final String key, final Type componentType, final T... values) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return; + } + entry.lock(); + try { + ConcurrentLinkedDeque list = entry.listValue; + list.addAll(List.of(values)); + } finally { + entry.unlock(); + } } @Override public CompletableFuture rpushxAsync(final String key, final Type componentType, final T... values) { - return runFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - return; - } - entry.lock(); + return runFuture(() -> rpushx(key, componentType, values)); + } + + @Override + public void rpush(final String key, final Type componentType, final T... values) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + containerLock.lock(); try { - ConcurrentLinkedDeque list = entry.listValue; - list.addAll(List.of(values)); + entry = find(key, CacheEntryType.LIST); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.LIST, key); + container.put(key, entry); + } } finally { - entry.unlock(); + containerLock.unlock(); } - }); + } + entry.lock(); + try { + ConcurrentLinkedDeque list = entry.listValue; + list.addAll(List.of(values)); + } finally { + entry.unlock(); + } } @Override public CompletableFuture rpushAsync(final String key, final Type componentType, final T... values) { - return runFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.LIST); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.LIST, key); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); - try { - ConcurrentLinkedDeque list = entry.listValue; - list.addAll(List.of(values)); - } finally { - entry.unlock(); - } - }); + return runFuture(() -> rpush(key, componentType, values)); + } + + @Override + public long lrem(final String key, final Type componentType, T value) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return 0L; + } + entry.lock(); + try { + return entry.listValue.remove(value) ? 1L : 0L; + } finally { + entry.unlock(); + } } @Override public CompletableFuture lremAsync(final String key, final Type componentType, T value) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.LIST); - if (entry == null) { - return 0L; - } - entry.lock(); - try { - return entry.listValue.remove(value) ? 1L : 0L; - } finally { - entry.unlock(); - } - }); + return supplyFuture(() -> lrem(key, componentType, value)); } //------------------------ 集合 Set ------------------------ @Override - public CompletableFuture> srandmemberAsync(String key, Type componentType, int count) { - return supplyFuture(() -> { - List list = new ArrayList<>(); - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - return list; - } - List vals = new ArrayList<>(entry.setValue); - if (count < 0) { //可以重复 - for (int i = 0; i < Math.abs(count); i++) { - int index = ThreadLocalRandom.current().nextInt(vals.size()); - T val = vals.get(index); - list.add(convertValue(componentType, val)); - } - } else { //不可以重复 - if (count >= vals.size()) { - return vals; - } - return vals.subList(0, count); - } + public List srandmember(String key, Type componentType, int count) { + List list = new ArrayList<>(); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { return list; - }); + } + List vals = new ArrayList<>(entry.setValue); + if (count < 0) { //可以重复 + for (int i = 0; i < Math.abs(count); i++) { + int index = ThreadLocalRandom.current().nextInt(vals.size()); + T val = vals.get(index); + list.add(convertValue(componentType, val)); + } + } else { //不可以重复 + if (count >= vals.size()) { + return vals; + } + return vals.subList(0, count); + } + return list; } @Override - public CompletableFuture smoveAsync(String key, String key2, Type componentType, T member) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - return false; - } - boolean rs = false; - entry.lock(); - try { - rs = entry.setValue.remove(member); - } finally { - entry.unlock(); - } - if (rs) { - CacheEntry entry2 = find(key2, CacheEntryType.SSET); - if (entry2 == null) { - containerLock.lock(); - try { - entry2 = find(key2, CacheEntryType.SSET); - if (entry2 == null) { - entry2 = new CacheEntry(CacheEntryType.SSET, key); - container.put(key2, entry2); - } - } finally { - containerLock.unlock(); - } - } - entry2.lock(); - try { - entry2.setValue.add(member); - } finally { - entry2.unlock(); - } - } - return rs; - }); + public CompletableFuture> srandmemberAsync(String key, Type componentType, int count) { + return supplyFuture(() -> srandmember(key, componentType, count)); } @Override - public CompletableFuture> sdiffAsync(final String key, final Type componentType, final String... key2s) { - return supplyFuture(() -> { - return sdiff0(key, key2s); - }); - } - - @Override - public CompletableFuture sdiffstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyFuture(() -> { - Set rs = sdiff0(srcKey, srcKey2s); - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { + public boolean smove(String key, String key2, Type componentType, T member) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return false; + } + boolean rs = false; + entry.lock(); + try { + rs = entry.setValue.remove(member); + } finally { + entry.unlock(); + } + if (rs) { + CacheEntry entry2 = find(key2, CacheEntryType.SSET); + if (entry2 == null) { containerLock.lock(); try { - entry = find(key, CacheEntryType.SSET); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.SSET, key); - container.put(key, entry); + entry2 = find(key2, CacheEntryType.SSET); + if (entry2 == null) { + entry2 = new CacheEntry(CacheEntryType.SSET, key); + container.put(key2, entry2); } } finally { containerLock.unlock(); } } - entry.lock(); + entry2.lock(); try { - entry.setValue.clear(); - entry.setValue.addAll(rs); + entry2.setValue.add(member); } finally { - entry.unlock(); + entry2.unlock(); } - return (long) rs.size(); - }); + } + return rs; + } + + @Override + public CompletableFuture smoveAsync(String key, String key2, Type componentType, T member) { + return supplyFuture(() -> smove(key, key2, componentType, member)); + } + + @Override + public Set sdiff(final String key, final Type componentType, final String... key2s) { + return sdiff0(key, key2s); + } + + @Override + public CompletableFuture> sdiffAsync(final String key, final Type componentType, final String... key2s) { + return supplyFuture(() -> sdiff(key, componentType, key2s)); + } + + @Override + public long sdiffstore(final String key, final String srcKey, final String... srcKey2s) { + Set rs = sdiff0(srcKey, srcKey2s); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.SSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.SSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + entry.setValue.clear(); + entry.setValue.addAll(rs); + } finally { + entry.unlock(); + } + return rs.size(); + } + + @Override + public CompletableFuture sdiffstoreAsync(final String key, final String srcKey, final String... srcKey2s) { + return supplyFuture(() -> sdiffstore(key, srcKey, srcKey2s)); } private Set sdiff0(final String key, final String... key2s) { @@ -1293,37 +1509,45 @@ public final class CacheMemorySource extends AbstractCacheSource { return rs; } + @Override + public Set sinter(final String key, final Type componentType, final String... key2s) { + return sinter0(key, key2s); + } + @Override public CompletableFuture> sinterAsync(final String key, final Type componentType, final String... key2s) { - return supplyFuture(() -> sinter0(key, key2s)); + return supplyFuture(() -> sinter(key, componentType, key2s)); + } + + @Override + public long sinterstore(final String key, final String srcKey, final String... srcKey2s) { + Set rs = sinter0(srcKey, srcKey2s); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.SSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.SSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + entry.setValue.clear(); + entry.setValue.addAll(rs); + } finally { + entry.unlock(); + } + return rs.size(); } @Override public CompletableFuture sinterstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyFuture(() -> { - Set rs = sinter0(srcKey, srcKey2s); - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.SSET); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.SSET, key); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); - try { - entry.setValue.clear(); - entry.setValue.addAll(rs); - } finally { - entry.unlock(); - } - return (long) rs.size(); - }); + return supplyFuture(() -> sinterstore(key, srcKey, srcKey2s)); } private Set sinter0(final String key, final String... key2s) { @@ -1351,38 +1575,46 @@ public final class CacheMemorySource extends AbstractCacheSource { return rs; } + @Override + public Set sunion(final String key, final Type componentType, final String... key2s) { + return sunion0(key, key2s); + } + @Override public CompletableFuture> sunionAsync(final String key, final Type componentType, final String... key2s) { - return supplyFuture(() -> sunion0(key, key2s)); + return supplyFuture(() -> sunion(key, componentType, key2s)); + } + + @Override + public long sunionstore(final String key, final String srcKey, final String... srcKey2s) { + Set rs = sunion0(srcKey, srcKey2s); + + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.SSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.SSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + entry.setValue.clear(); + entry.setValue.addAll(rs); + } finally { + entry.unlock(); + } + return rs.size(); } @Override public CompletableFuture sunionstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyFuture(() -> { - Set rs = sunion0(srcKey, srcKey2s); - - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.SSET); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.SSET, key); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); - try { - entry.setValue.clear(); - entry.setValue.addAll(rs); - } finally { - entry.unlock(); - } - return (long) rs.size(); - }); + return supplyFuture(() -> sunionstore(key, srcKey, srcKey2s)); } private Set sunion0(final String key, final String... key2s) { @@ -1400,397 +1632,454 @@ public final class CacheMemorySource extends AbstractCacheSource { return rs; } + @Override + public Set smembers(final String key, final Type componentType) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return new LinkedHashSet<>(); + } + return new LinkedHashSet<>(entry.setValue); + } + @Override public CompletableFuture> smembersAsync(final String key, final Type componentType) { - return supplyFuture(() -> { + return supplyFuture(() -> smembers(key, componentType)); + } + + @Override + public Map> smembers(final Type componentType, final String... keys) { + Map> map = new HashMap<>(); + for (String key : keys) { CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - return new LinkedHashSet<>(); + if (entry != null) { + map.put(key, new LinkedHashSet<>(entry.setValue)); } - return new LinkedHashSet<>(entry.setValue); - }); + } + return map; } @Override public CompletableFuture>> smembersAsync(final Type componentType, final String... keys) { - return supplyFuture(() -> { - Map> map = new HashMap<>(); - for (String key : keys) { - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry != null) { - map.put(key, new LinkedHashSet<>(entry.setValue)); - } + return supplyFuture(() -> smembers(componentType, keys)); + } + + @Override + public List smismembers(final String key, final String... members) { + CacheEntry entry = find(key, CacheEntryType.SSET); + List rs = new ArrayList<>(); + if (entry == null) { + for (String member : members) { + rs.add(false); } - return map; - }); + return rs; + } + Set set = entry.setValue; + for (String member : members) { + rs.add(set.contains(member)); + } + return rs; } @Override public CompletableFuture> smismembersAsync(final String key, final String... members) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - List rs = new ArrayList<>(); - if (entry == null) { - for (String member : members) { - rs.add(false); + return supplyFuture(() -> smismembers(key, members)); + } + + @Override + public void sadd(final String key, final Type componentType, T... values) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.SSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.SSET, key); + container.put(key, entry); } - return rs; + } finally { + containerLock.unlock(); } + } + entry.lock(); + try { Set set = entry.setValue; - for (String member : members) { - rs.add(set.contains(member)); - } - return rs; - }); + set.addAll(List.of(values)); + } finally { + entry.unlock(); + } } @Override public CompletableFuture saddAsync(final String key, final Type componentType, T... values) { - return runFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.SSET); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.SSET, key); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); - try { - Set set = entry.setValue; - set.addAll(List.of(values)); - } finally { - entry.unlock(); - } - }); + return runFuture(() -> sadd(key, componentType, values)); + } + + @Override + public long scard(final String key) { + CacheEntry entry = find(key, CacheEntryType.SSET); + return entry == null ? 0L : (long) entry.setValue.size(); } @Override public CompletableFuture scardAsync(final String key) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - return entry == null ? 0L : (long) entry.setValue.size(); - }); + return supplyFuture(() -> scard(key)); + } + + @Override + public boolean sismember(final String key, final Type type, final T value) { + CacheEntry entry = find(key, CacheEntryType.SSET); + return entry != null && entry.setValue.contains(value); } @Override public CompletableFuture sismemberAsync(final String key, final Type type, final T value) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - return entry != null && entry.setValue.contains(value); - }); + return supplyFuture(() -> sismember(key, type, value)); + } + + @Override + public T spop(final String key, final Type componentType) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return null; + } + entry.lock(); + try { + final Set cset = entry.setValue; + if (cset.isEmpty()) { + return null; + } + Iterator it = cset.iterator(); + Object del = null; + if (it.hasNext()) { + del = it.next(); + } + if (del != null) { + cset.remove(del); + return (T) del; + } + return null; + } finally { + entry.unlock(); + } } @Override public CompletableFuture spopAsync(final String key, final Type componentType) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - return null; + return supplyFuture(() -> spop(key, componentType)); + } + + @Override + public Set spop(final String key, final int count, final Type componentType) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return new LinkedHashSet<>(); + } + entry.lock(); + try { + final Set cset = entry.setValue; + if (cset.isEmpty()) { + return new LinkedHashSet<>(); } - entry.lock(); - try { - final Set cset = entry.setValue; - if (cset.isEmpty()) { - return null; + Iterator it = cset.iterator(); + Set list = new LinkedHashSet<>(); + int index = 0; + while (it.hasNext()) { + list.add(convertValue(componentType, it.next())); + if (++index >= count) { + break; } - Iterator it = cset.iterator(); - Object del = null; - if (it.hasNext()) { - del = it.next(); - } - if (del != null) { - cset.remove(del); - return (T) del; - } - return null; - } finally { - entry.unlock(); } - }); + cset.removeAll(list); + return list; + } finally { + entry.unlock(); + } } @Override public CompletableFuture> spopAsync(final String key, final int count, final Type componentType) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { + return supplyFuture(() -> spop(key, count, componentType)); + } + + @Override + public Set sscan(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return new LinkedHashSet<>(); + } + entry.lock(); + try { + final Set cset = entry.setValue; + if (cset.isEmpty()) { return new LinkedHashSet<>(); } - entry.lock(); - try { - final Set cset = entry.setValue; - if (cset.isEmpty()) { - return new LinkedHashSet<>(); - } - Iterator it = cset.iterator(); - Set list = new LinkedHashSet<>(); - int index = 0; - while (it.hasNext()) { - list.add(convertValue(componentType, it.next())); - if (++index >= count) { - break; - } - } - cset.removeAll(list); - return list; - } finally { - entry.unlock(); + Iterator it = cset.iterator(); + Set list = new LinkedHashSet<>(); + while (it.hasNext()) { + list.add((T) convertValue(componentType, it.next())); } - }); + return list; + } finally { + entry.unlock(); + } } @Override public CompletableFuture> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - return new LinkedHashSet<>(); - } - entry.lock(); - try { - final Set cset = entry.setValue; - if (cset.isEmpty()) { - return new LinkedHashSet<>(); - } - Iterator it = cset.iterator(); - Set list = new LinkedHashSet<>(); - while (it.hasNext()) { - list.add((T) convertValue(componentType, it.next())); - } - return list; - } finally { - entry.unlock(); - } - }); + return supplyFuture(() -> sscan(key, componentType, cursor, limit, pattern)); + } + + @Override + public long srem(String key, Type type, T... values) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return 0L; + } + return entry.setValue.removeAll(Arrays.asList(values)) ? 1L : 0L; } @Override public CompletableFuture sremAsync(String key, Type type, T... values) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.SSET); - if (entry == null) { - return 0L; - } - return entry.setValue.removeAll(Arrays.asList(values)) ? 1L : 0L; - }); + return supplyFuture(() -> srem(key, type, values)); } //------------------------ 有序集合 Sorted Set ------------------------ @Override - public CompletableFuture zaddAsync(String key, CacheScoredValue... values) { - return runFuture(() -> { - List list = new ArrayList<>(); - for (CacheScoredValue v : values) { - list.add(new CacheScoredValue(v)); - } - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.ZSET, key); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); + public void zadd(String key, CacheScoredValue... values) { + List list = new ArrayList<>(); + for (CacheScoredValue v : values) { + list.add(new CacheScoredValue(v)); + } + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + containerLock.lock(); try { - entry.setValue.addAll(list); + entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.ZSET, key); + container.put(key, entry); + } } finally { - entry.unlock(); + containerLock.unlock(); } - }); + } + entry.lock(); + try { + entry.setValue.addAll(list); + } finally { + entry.unlock(); + } + } + + @Override + public CompletableFuture zaddAsync(String key, CacheScoredValue... values) { + return runFuture(() -> zadd(key, values)); + } + + @Override + public T zincrby(String key, CacheScoredValue value) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.ZSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + Set sets = entry.setValue; + CacheScoredValue old = sets.stream().filter(v -> Objects.equals(v.getValue(), value.getValue())).findAny().orElse(null); + if (old == null) { + sets.add(new CacheScoredValue(value.getScore().doubleValue(), value.getValue())); + return (T) value.getScore(); + } else { + Number ic = value.getScore(); + if (ic instanceof Integer) { + old.setScore((double) (old.getScore().intValue() + ic.intValue())); + } else if (ic instanceof Long) { + old.setScore((double) (old.getScore().longValue() + ic.longValue())); + } else if (ic instanceof Float) { + old.setScore((double) (old.getScore().floatValue() + ic.floatValue())); + } else if (ic instanceof Double) { + old.setScore(old.getScore().doubleValue() + ic.doubleValue()); + } else if (ic instanceof AtomicInteger) { + ((AtomicInteger) old.getScore()).addAndGet(((AtomicInteger) ic).get()); + } else if (ic instanceof AtomicLong) { + ((AtomicLong) old.getScore()).addAndGet(((AtomicLong) ic).get()); + } + return (T) old.getScore(); + } + } finally { + entry.unlock(); + } } @Override public CompletableFuture zincrbyAsync(String key, CacheScoredValue value) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - containerLock.lock(); - try { - entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.ZSET, key); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - entry.lock(); - try { - Set sets = entry.setValue; - CacheScoredValue old = sets.stream().filter(v -> Objects.equals(v.getValue(), value.getValue())).findAny().orElse(null); - if (old == null) { - sets.add(new CacheScoredValue(value.getScore().doubleValue(), value.getValue())); - return (T) value.getScore(); - } else { - Number ic = value.getScore(); - if (ic instanceof Integer) { - old.setScore((double) (old.getScore().intValue() + ic.intValue())); - } else if (ic instanceof Long) { - old.setScore((double) (old.getScore().longValue() + ic.longValue())); - } else if (ic instanceof Float) { - old.setScore((double) (old.getScore().floatValue() + ic.floatValue())); - } else if (ic instanceof Double) { - old.setScore(old.getScore().doubleValue() + ic.doubleValue()); - } else if (ic instanceof AtomicInteger) { - ((AtomicInteger) old.getScore()).addAndGet(((AtomicInteger) ic).get()); - } else if (ic instanceof AtomicLong) { - ((AtomicLong) old.getScore()).addAndGet(((AtomicLong) ic).get()); - } - return (T) old.getScore(); - } - } finally { - entry.unlock(); - } - }); + return supplyFuture(() -> zincrby(key, value)); + } + + @Override + public long zcard(String key) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + return 0L; + } + return entry.setValue.size(); } @Override public CompletableFuture zcardAsync(String key) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - return 0L; + return supplyFuture(() -> zcard(key)); + } + + @Override + public Long zrank(String key, String member) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + return null; + } + List list = new ArrayList<>(entry.setValue); + Collections.sort(list); + long c = 0; + for (CacheScoredValue v : list) { + if (Objects.equals(v.getValue(), member)) { + return c; } - return (long) entry.setValue.size(); - }); + c++; + } + return null; } @Override public CompletableFuture zrankAsync(String key, String member) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - return null; - } - List list = new ArrayList<>(entry.setValue); - Collections.sort(list); - long c = 0; - for (CacheScoredValue v : list) { - if (Objects.equals(v.getValue(), member)) { - return c; - } - c++; - } + return supplyFuture(() -> zrank(key, member)); + } + + @Override + public Long zrevrank(String key, String member) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { return null; - }); + } + List list = new ArrayList<>(entry.setValue); + Collections.sort(list, Collections.reverseOrder()); + long c = 0; + for (CacheScoredValue v : list) { + if (Objects.equals(v.getValue(), member)) { + return c; + } + c++; + } + return null; } @Override public CompletableFuture zrevrankAsync(String key, String member) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - return null; + return supplyFuture(() -> zrevrank(key, member)); + } + + @Override + public List zrange(String key, int start, int stop) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + return new ArrayList<>(); + } + List list = new ArrayList<>(); + Set sets = entry.setValue; + long c = 0; + for (CacheScoredValue v : sets) { + if (c >= start && (stop < 0 || c <= stop)) { + list.add(v.getValue()); } - List list = new ArrayList<>(entry.setValue); - Collections.sort(list, Collections.reverseOrder()); - long c = 0; - for (CacheScoredValue v : list) { - if (Objects.equals(v.getValue(), member)) { - return c; - } - c++; - } - return null; - }); + c++; + } + return list; } @Override public CompletableFuture> zrangeAsync(String key, int start, int stop) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - return new ArrayList<>(); - } - List list = new ArrayList<>(); - Set sets = entry.setValue; - long c = 0; - for (CacheScoredValue v : sets) { - if (c >= start && (stop < 0 || c <= stop)) { - list.add(v.getValue()); - } - c++; - } - return list; - }); + return supplyFuture(() -> zrange(key, start, stop)); + } + + @Override + public List zscan(String key, Type scoreType, AtomicLong cursor, int limit, String pattern) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + return new ArrayList(); + } + Set sets = entry.setValue; + if (Utility.isEmpty(pattern)) { + return sets.stream().collect(Collectors.toList()); + } else { + Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); + return sets.stream().filter(en -> regx.test(en.getValue())).collect(Collectors.toList()); + } } @Override public CompletableFuture> zscanAsync(String key, Type scoreType, AtomicLong cursor, int limit, String pattern) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - return new ArrayList(); + return supplyFuture(() -> zscan(key, scoreType, cursor, limit, pattern)); + } + + @Override + public long zrem(String key, String... members) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + return 0L; + } + Set sets = entry.setValue; + long c = 0; + Set keys = Set.of(members); + Iterator it = sets.iterator(); + Set dels = new HashSet<>(); + while (it.hasNext()) { + CacheScoredValue v = it.next(); + if (keys.contains(v.getValue())) { + c++; + dels.add(v); } - Set sets = entry.setValue; - if (Utility.isEmpty(pattern)) { - return sets.stream().collect(Collectors.toList()); - } else { - Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); - return sets.stream().filter(en -> regx.test(en.getValue())).collect(Collectors.toList()); - } - }); + } + sets.removeAll(dels); + return c; } @Override public CompletableFuture zremAsync(String key, String... members) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - return 0L; + return supplyFuture(() -> zrem(key, members)); + } + + @Override + public List zmscore(String key, Class scoreType, String... members) { + List list = new ArrayList<>(); + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + for (int i = 0; i < members.length; i++) { + list.add(null); } - Set sets = entry.setValue; - long c = 0; - Set keys = Set.of(members); - Iterator it = sets.iterator(); - Set dels = new HashSet<>(); - while (it.hasNext()) { - CacheScoredValue v = it.next(); - if (keys.contains(v.getValue())) { - c++; - dels.add(v); - } - } - sets.removeAll(dels); - return c; + return list; + } + Set keys = Set.of(members); + Set sets = entry.setValue; + Map map = new HashMap<>(); + sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> { + map.put(v.getValue(), formatScore(scoreType, v.getScore())); }); + for (String m : members) { + list.add(map.get(m)); + } + return list; } @Override public CompletableFuture> zmscoreAsync(String key, Class scoreType, String... members) { - return supplyFuture(() -> { - List list = new ArrayList<>(); - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - for (int i = 0; i < members.length; i++) { - list.add(null); - } - return list; - } - Set keys = Set.of(members); - Set sets = entry.setValue; - Map map = new HashMap<>(); - sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> { - map.put(v.getValue(), formatScore(scoreType, v.getScore())); - }); - for (String m : members) { - list.add(map.get(m)); - } - return list; - }); + return supplyFuture(() -> zmscore(key, scoreType, members)); } private T formatScore(Class scoreType, Number score) { @@ -1810,45 +2099,72 @@ public final class CacheMemorySource extends AbstractCacheSource { } } + @Override + public T zscore(String key, Class scoreType, String member) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + return null; + } + Set sets = entry.setValue; + return formatScore(scoreType, sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null)); + } + @Override public CompletableFuture zscoreAsync(String key, Class scoreType, String member) { - return supplyFuture(() -> { - CacheEntry entry = find(key, CacheEntryType.ZSET); - if (entry == null) { - return null; - } - Set sets = entry.setValue; - return formatScore(scoreType, sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null)); - }); + return supplyFuture(() -> zscore(key, scoreType, member)); + } + + @Override + public long dbsize() { + return container.size(); } @Override public CompletableFuture dbsizeAsync() { - return supplyFuture(() -> (long) container.size()); + return supplyFuture(this::dbsize); + } + + @Override + public void flushdb() { + container.clear(); } @Override public CompletableFuture flushdbAsync() { - return runFuture(container::clear); + return runFuture(this::flushdb); + } + + @Override + public void flushall() { + container.clear(); } @Override public CompletableFuture flushallAsync() { - return runFuture(container::clear); + return runFuture(this::flushall); + } + + @Override + public List keys(String pattern) { + List rs = new ArrayList<>(); + Predicate filter = isEmpty(pattern) ? x -> true : Pattern.compile(pattern).asPredicate(); + container.forEach((k, v) -> { + if (filter.test(k) && !v.isExpired()) { + rs.add(k); + } + }); + return rs; } @Override public CompletableFuture> keysAsync(String pattern) { - return supplyFuture(() -> { - List rs = new ArrayList<>(); - Predicate filter = isEmpty(pattern) ? x -> true : Pattern.compile(pattern).asPredicate(); - container.forEach((k, v) -> { - if (filter.test(k) && !v.isExpired()) { - rs.add(k); - } - }); - return rs; - }); + return supplyFuture(() -> keys(pattern)); + } + + @Override + public List scan(AtomicLong cursor, int limit, String pattern) { + cursor.set(0); + return keys(pattern); } @Override @@ -1860,17 +2176,20 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public CompletableFuture> keysStartsWithAsync(String startsWith) { - return supplyFuture(() -> { - List rs = new ArrayList<>(); - Predicate filter = isEmpty(startsWith) ? x -> true : x -> x.startsWith(startsWith); - container.forEach((k, v) -> { - if (filter.test(k) && !v.isExpired()) { - rs.add(k); - } - }); - return rs; + public List keysStartsWith(String startsWith) { + List rs = new ArrayList<>(); + Predicate filter = isEmpty(startsWith) ? x -> true : x -> x.startsWith(startsWith); + container.forEach((k, v) -> { + if (filter.test(k) && !v.isExpired()) { + rs.add(k); + } }); + return rs; + } + + @Override + public CompletableFuture> keysStartsWithAsync(String startsWith) { + return supplyFuture(() -> keysStartsWith(startsWith)); } protected CacheEntry find(String key) {