From a8ee396ffa856a1a652e2731f941d78cd097fc55 Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 28 Jul 2023 13:09:07 +0800 Subject: [PATCH] =?UTF-8?q?CacheMemorySource=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/source/CacheMemorySource.java | 2179 +++++++++-------- 1 file changed, 1112 insertions(+), 1067 deletions(-) diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index fd89639f2..db4a1b239 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -16,7 +16,6 @@ import java.util.logging.*; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.redkale.annotation.AutoLoad; -import org.redkale.annotation.ConstructorParameters; import org.redkale.annotation.*; import org.redkale.annotation.ResourceListener; import org.redkale.annotation.ResourceType; @@ -24,6 +23,7 @@ import org.redkale.convert.*; import org.redkale.convert.json.*; import org.redkale.service.Local; import org.redkale.util.*; +import static org.redkale.util.Utility.isEmpty; /** * CacheSource的默认实现--内存缓存, 此实现只可用于调试,不可用于生产环境 @@ -54,7 +54,7 @@ public final class CacheMemorySource extends AbstractCacheSource { private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - protected final ConcurrentHashMap> container = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap container = new ConcurrentHashMap<>(); protected final ReentrantLock containerLock = new ReentrantLock(); @@ -120,7 +120,7 @@ public final class CacheMemorySource extends AbstractCacheSource { keys.clear(); long now = System.currentTimeMillis(); container.forEach((k, x) -> { - if (x.expireSeconds > 0 && (now > (x.lastAccessed + x.expireSeconds))) { + if (x.expireMills > 0 && (now > (x.lastAccessed + x.expireMills))) { keys.add(x.key); } }); @@ -160,442 +160,6 @@ public final class CacheMemorySource extends AbstractCacheSource { return CompletableFuture.completedFuture(true); } - //----------- hxxx -------------- - @Override - public CompletableFuture hdelAsync(final String key, String... fields) { - return supplyFuture(() -> { - long count = 0; - CacheEntry entry = container.get(key); - if (entry == null || entry.mapValue == null) { - return 0L; - } - for (String field : fields) { - if (entry.mapValue.remove(field) != null) { - count++; - } - } - return count; - }); - } - - @Override - public CompletableFuture> hkeysAsync(final String key) { - return supplyFuture(() -> { - List list = new ArrayList<>(); - CacheEntry entry = container.get(key); - if (entry == null || entry.mapValue == null) { - return list; - } - list.addAll(entry.mapValue.keySet()); - return list; - }); - } - - @Override - public CompletableFuture hlenAsync(final String key) { - return supplyFuture(() -> { - CacheEntry entry = container.get(key); - if (entry == null || entry.mapValue == null) { - return 0L; - } - return (long) entry.mapValue.keySet().size(); - }); - } - - @Override - public CompletableFuture hincrAsync(final String key, String field) { - return hincrbyAsync(key, field, 1); - } - - @Override - public CompletableFuture hincrbyAsync(final String key, String field, long num) { - return supplyFuture(() -> { - CacheEntry entry = container.get(key); - if (entry == null) { - containerLock.lock(); - try { - entry = container.get(key); - if (entry == null) { - ConcurrentHashMap map = new ConcurrentHashMap(); - map.put(field, new AtomicLong()); - entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); - if (!(val instanceof AtomicLong)) { - entry.lock.lock(); - try { - if (!(val instanceof AtomicLong)) { - if (val == null) { - val = new AtomicLong(); - } else { - val = new AtomicLong(((Number) val).longValue()); - } - entry.mapValue.put(field, val); - } - } finally { - entry.lock.unlock(); - } - } - return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num); - }); - } - - @Override - public CompletableFuture hincrbyFloatAsync(final String key, String field, double num) { - return supplyFuture(() -> { - CacheEntry entry = container.get(key); - if (entry == null) { - containerLock.lock(); - try { - entry = container.get(key); - if (entry == null) { - ConcurrentHashMap map = new ConcurrentHashMap(); - map.put(field, new AtomicLong()); - entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); - if (!(val instanceof AtomicLong)) { - entry.lock.lock(); - try { - if (!(val instanceof AtomicLong)) { - if (val == null) { - val = new AtomicLong(); - } else { - val = new AtomicLong(((Number) val).longValue()); - } - entry.mapValue.put(field, val); - } - } finally { - entry.lock.unlock(); - } - } - return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num))); - }); - } - - @Override - public CompletableFuture hdecrAsync(final String key, String field) { - return hincrbyAsync(key, field, -1); - } - - @Override - public CompletableFuture hdecrbyAsync(final String key, String field, long num) { - return hincrbyAsync(key, field, -num); - } - - @Override - public CompletableFuture hexistsAsync(final String key, String field) { - return supplyFuture(() -> { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return false; - } - return entry.mapValue.contains(field); - }); - } - - @Override - public CompletableFuture hsetAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return runFuture(() -> { - hset(CacheEntryType.MAP, key, field, value); - }); - } - - @Override - public CompletableFuture hsetnxAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return supplyFuture(() -> { - return hsetnx(CacheEntryType.MAP, key, field, value); - }); - } - - @Override - public CompletableFuture hmsetAsync(final String key, final Serializable... values) { - return runFuture(() -> { - for (int i = 0; i < values.length; i += 2) { - hset(CacheEntryType.MAP, key, (String) values[i], values[i + 1]); - } - }); - } - - @Override - public CompletableFuture hmsetAsync(final String key, final Map map) { - return runFuture(() -> { - map.forEach((k, v) -> hset(CacheEntryType.MAP, key, (String) k, v)); - }); - } - - @Override - public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields) { - return supplyFuture(() -> { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return null; - } - List rs = new ArrayList<>(fields.length); - for (int i = 0; i < fields.length; i++) { - Serializable val = (Serializable) entry.mapValue.get(fields[i]); - if (type == String.class) { - rs.add(val == null ? null : (T) String.valueOf(val)); - } else { - rs.add((T) val); - } - } - return rs; - }); - } - - @Override - public CompletableFuture> hgetallAsync(final String key, final Type type) { - return supplyFuture(() -> { - return hgetall(CacheEntryType.MAP, key, type); - }); - } - - @Override - public CompletableFuture> hvalsAsync(final String key, final Type type) { - return supplyFuture(() -> { - return hvals(CacheEntryType.MAP, key, type); - }); - } - - @Override - public CompletableFuture> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { - return supplyFuture(() -> { - if (key == null) { - return new HashMap(); - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return new HashMap(); - } - if (Utility.isEmpty(pattern)) { - return new HashMap(entry.mapValue); - } else { - Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); - Set> set = entry.mapValue.entrySet(); - return set.stream().filter(en -> regx.test(en.getKey())).collect(Collectors.toMap(en -> en.getKey(), en -> en.getValue())); - } - }); - } - - @Override - public CompletableFuture hgetAsync(final String key, final String field, final Type type) { - return supplyFuture(() -> { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return null; - } - Object obj = entry.mapValue.get(field); - if (obj == null) { - return null; - } - if (type == long.class || type == Long.class) { - return (T) (obj instanceof Long ? obj : Long.parseLong(obj.toString())); - } - return (T) obj; - }); - } - - @Override - public CompletableFuture hstrlenAsync(final String key, final String field) { - return supplyFuture(() -> { - if (key == null || field == null) { - return 0L; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return 0L; - } - Object obj = entry.mapValue.get(field); - if (obj == null) { - return 0L; - } - return (long) obj.toString().length(); - }); - } - - //----------- hxxx -------------- - @Override - public CompletableFuture existsAsync(String key) { - return supplyFuture(() -> { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); - if (entry == null) { - return false; - } - return !entry.isExpired(); - }); - } - - @Override - public CompletableFuture getAsync(final String key, final Type type) { - return supplyFuture(() -> { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired()) { - return null; - } - if (entry.isListCacheType()) { - return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue)); - } - if (entry.isSetCacheType()) { - return (T) (entry.csetValue == null ? null : new LinkedHashSet<>(entry.csetValue)); - } - if (entry.cacheType == CacheEntryType.DOUBLE) { - return (T) (Double) Double.longBitsToDouble(((AtomicLong) entry.objectValue).intValue()); - } - Object obj = entry.objectValue; - if (obj != null && obj.getClass() != type) { - return (T) JsonConvert.root().convertFrom(type, JsonConvert.root().convertToBytes(obj)); - } - return (T) obj; - }); - } - - //----------- hxxx -------------- - @Override - public CompletableFuture getexAsync(final String key, final int expireSeconds, final Type type) { - return supplyFuture(() -> { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired()) { - return null; - } - entry.lastAccessed = System.currentTimeMillis(); - entry.expireSeconds = expireSeconds; - if (entry.isListCacheType()) { - return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue)); - } - if (entry.isSetCacheType()) { - return (T) (entry.csetValue == null ? null : new HashSet(entry.csetValue)); - } - return (T) entry.objectValue; - }); - } - - protected void set(CacheEntryType cacheType, String key, Object value) { - if (key == null) { - return; - } - CacheEntry entry = container.get(key); - if (entry == null) { - entry = new CacheEntry(cacheType, key, value, null, null, null); - container.put(key, entry); - } else { - entry.expireSeconds = 0; - entry.objectValue = value; - entry.lastAccessed = System.currentTimeMillis(); - } - } - - protected boolean setnx(CacheEntryType cacheType, String key, Object value) { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); - if (entry == null) { - entry = new CacheEntry(cacheType, key, value, null, null, null); - container.putIfAbsent(key, entry); - return true; - } else { - entry.expireSeconds = 0; - entry.lastAccessed = System.currentTimeMillis(); - return false; - } - } - - protected void hset(CacheEntryType cacheType, String key, String field, Object value) { - if (key == null || value == null) { - return; - } - CacheEntry entry = container.get(key); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.MAP, key, value, null, null, new ConcurrentHashMap<>()); - container.put(key, entry); - entry.mapValue.put(field, value); - } else { - entry.expireSeconds = 0; - entry.mapValue.put(field, value); - entry.lastAccessed = System.currentTimeMillis(); - } - } - - protected boolean hsetnx(CacheEntryType cacheType, String key, String field, Object value) { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.MAP, key, value, null, null, new ConcurrentHashMap<>()); - container.putIfAbsent(key, entry); - entry.mapValue.putIfAbsent(field, value); - return true; - } else { - entry.expireSeconds = 0; - entry.lastAccessed = System.currentTimeMillis(); - return false; - } - } - - protected Map hgetall(CacheEntryType cacheType, String key, final Type type) { - if (key == null) { - return new LinkedHashMap(); - } - CacheEntry entry = container.get(key); - if (entry == null) { - return new LinkedHashMap(); - } else if (type == long.class || type == Long.class) { - Map map = new LinkedHashMap(); - entry.mapValue.forEach((k, v) -> { - map.put(k, v instanceof Long ? v : (v == null ? null : Long.parseLong(v.toString()))); - }); - return map; - } else { - return new LinkedHashMap(entry.mapValue); - } - } - - protected List hvals(CacheEntryType cacheType, String key, final Type type) { - if (key == null) { - return new ArrayList(); - } - CacheEntry entry = container.get(key); - if (entry == null) { - return new ArrayList(); - } else { - if (type == long.class || type == Long.class) { - return entry.mapValue.values().stream().map(v -> v instanceof Long ? v : (v == null ? null : Long.parseLong(v.toString()))).toList(); - } else { - return new ArrayList(entry.mapValue.values()); - } - } - } - @Override public CompletableFuture msetAsync(Serializable... keyVals) { return runFuture(() -> { @@ -605,13 +169,7 @@ public final class CacheMemorySource extends AbstractCacheSource { for (int i = 0; i < keyVals.length; i += 2) { String key = keyVals[i].toString(); Object val = keyVals[i + 1]; - if (val instanceof String) { - set(CacheEntryType.STRING, key, val); - } else if (val instanceof Number) { - set(CacheEntryType.LONG, key, ((Number) val).longValue()); - } else { - set(CacheEntryType.OBJECT, key, val); - } + set0(key.toString(), 0, null, val); } }); } @@ -620,13 +178,7 @@ public final class CacheMemorySource extends AbstractCacheSource { public CompletableFuture msetAsync(Map map) { return runFuture(() -> { map.forEach((key, val) -> { - if (val instanceof String) { - set(CacheEntryType.STRING, (String) key, val); - } else if (val instanceof Number) { - set(CacheEntryType.LONG, (String) key, ((Number) val).longValue()); - } else { - set(CacheEntryType.OBJECT, (String) key, val); - } + set0(key.toString(), 0, null, val); }); }); } @@ -634,29 +186,46 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setAsync(String key, Convert convert, Type type, T value) { return runFuture(() -> { - set(findEntryType(type), key, value); + set0(key, 0, type, value); }); } @Override public CompletableFuture setnxAsync(String key, Convert convert, Type type, T value) { - return supplyFuture(() -> { - return setnx(findEntryType(type), key, value); - }); + return setnxexAsync(key, 0, convert, type, value); } @Override public CompletableFuture setnxexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { return supplyFuture(() -> { - return setnxex(findEntryType(type), expireSeconds, key, value); + CacheEntry entry = find(key); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.OBJECT); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.OBJECT, key); + container.put(key, entry); + entry.objectValue = value; + entry.expireSeconds(expireSeconds); + entry.lastAccessed = System.currentTimeMillis(); + return true; + } + return false; + } finally { + containerLock.unlock(); + } + } + return false; }); } @Override public CompletableFuture getSetAsync(String key, Convert convert, Type type, T value) { return supplyFuture(() -> { - T old = get(key, type); - set(findEntryType(type), key, value); + CacheEntry entry = find(key, CacheEntryType.OBJECT); + T old = entry == null ? null : (T) entry.objectValue; + set0(key, 0, type, value); return old; }); } @@ -664,80 +233,84 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture getDelAsync(String key, Type type) { return supplyFuture(() -> { - CacheEntry entry = container.remove(key); - return entry == null ? null : (T) entry.objectValue; + CacheEntry entry = find(key, CacheEntryType.OBJECT); + if (entry == null) { + return null; + } + containerLock.lock(); + try { + container.remove(key); + } finally { + containerLock.unlock(); + } + return (T) entry.objectValue; }); } - protected void set(CacheEntryType cacheType, int expireSeconds, String key, Object value) { - if (key == null) { - return; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired()) { - entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null, null); - container.putIfAbsent(key, entry); - } else { - if (expireSeconds > 0) { - entry.expireSeconds = expireSeconds; + private void set0(String key, int expireSeconds, Type type, Object value) { + CacheEntry entry = find(key, CacheEntryType.OBJECT); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.OBJECT); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.OBJECT, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); } - entry.lastAccessed = System.currentTimeMillis(); - entry.objectValue = value; } - } - - protected boolean setnxex(CacheEntryType cacheType, int expireSeconds, String key, Object value) { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired()) { - entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null, null); - container.put(key, entry); - return true; - } else { - entry.expireSeconds = expireSeconds > 0 ? expireSeconds : 0; + entry.lock(); + try { + entry.objectValue = formatValue(type, value); + entry.expireSeconds(expireSeconds); entry.lastAccessed = System.currentTimeMillis(); - return false; + } finally { + entry.unlock(); } } @Override public CompletableFuture setexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { return runFuture(() -> { - set(findEntryType(type), expireSeconds, key, value); + set0(key, expireSeconds, type, value); }); } @Override public CompletableFuture expireAsync(String key, int expireSeconds) { return runFuture(() -> { - if (key == null) { - return; - } - CacheEntry entry = container.get(key); + CacheEntry entry = find(key); if (entry == null) { return; } - entry.expireSeconds = expireSeconds; + entry.lock(); + try { + entry.expireSeconds(expireSeconds); + } finally { + entry.unlock(); + } }); } @Override public CompletableFuture persistAsync(final String key) { return supplyFuture(() -> { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); + CacheEntry entry = find(key); if (entry == null) { return false; } - if (entry.expireSeconds > 0) { - entry.expireSeconds = 0; - return true; - } else { - return false; + entry.lock(); + try { + if (entry.expireMills > 0) { + entry.expireMills = 0; + return true; + } else { + return false; + } + } finally { + entry.unlock(); } }); } @@ -748,14 +321,19 @@ public final class CacheMemorySource extends AbstractCacheSource { if (oldKey == null || newKey == null) { return false; } - CacheEntry entry = container.get(oldKey); - if (entry == null) { - return false; + containerLock.lock(); + try { + CacheEntry oldEntry = find(oldKey); + if (oldEntry == null) { + return false; + } + oldEntry.key = newKey; + container.put(newKey, oldEntry); + container.remove(oldKey); + return true; + } finally { + containerLock.unlock(); } - entry.key = newKey; - container.put(newKey, entry); - container.remove(oldKey); - return true; }); } @@ -765,17 +343,23 @@ public final class CacheMemorySource extends AbstractCacheSource { if (oldKey == null || newKey == null) { return false; } - if (container.containsKey(newKey)) { - return false; + containerLock.lock(); + try { + CacheEntry newEntry = find(newKey); + if (newEntry != null) { + return false; + } + CacheEntry oldEntry = find(oldKey); + if (oldEntry == null) { + return false; + } + oldEntry.key = newKey; + container.put(newKey, oldEntry); + container.remove(oldKey); + return true; + } finally { + containerLock.unlock(); } - CacheEntry entry = container.get(oldKey); - if (entry == null) { - return false; - } - entry.key = newKey; - container.put(newKey, entry); - container.remove(oldKey); - return true; }); } @@ -786,8 +370,13 @@ public final class CacheMemorySource extends AbstractCacheSource { return 0L; } long count = 0; - for (String key : keys) { - count += container.remove(key) == null ? 0 : 1; + containerLock.lock(); + try { + for (String key : keys) { + count += container.remove(key) == null ? 0 : 1; + } + } finally { + containerLock.unlock(); } return count; }); @@ -801,43 +390,44 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture incrbyAsync(final String key, long num) { return supplyFuture(() -> { - CacheEntry entry = container.get(key); + CacheEntry entry = find(key); if (entry == null) { containerLock.lock(); try { - entry = container.get(key); + entry = find(key, CacheEntryType.ATOMIC); if (entry == null) { - entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null, null); + entry = new CacheEntry(CacheEntryType.ATOMIC, key); + entry.objectValue = new AtomicLong(); container.put(key, entry); } } finally { containerLock.unlock(); } } - if (!(entry.objectValue instanceof AtomicLong)) { - containerLock.lock(); - try { - if (!(entry.objectValue instanceof AtomicLong)) { - entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); - } - } finally { - containerLock.unlock(); + entry.lock(); + try { + if (entry.cacheType != CacheEntryType.ATOMIC) { + entry.cacheType = CacheEntryType.ATOMIC; + entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); } + return ((AtomicLong) entry.objectValue).addAndGet(num); + } finally { + entry.unlock(); } - return ((AtomicLong) entry.objectValue).addAndGet(num); }); } @Override public CompletableFuture incrbyFloatAsync(final String key, double num) { return supplyFuture(() -> { - CacheEntry entry = container.get(key); + CacheEntry entry = find(key, CacheEntryType.DOUBLE); if (entry == null) { containerLock.lock(); try { - entry = container.get(key); + entry = find(key, CacheEntryType.DOUBLE); if (entry == null) { - entry = new CacheEntry(CacheEntryType.DOUBLE, key, new AtomicLong(), null, null, null); + entry = new CacheEntry(CacheEntryType.DOUBLE, key); + entry.objectValue = new AtomicLong(); container.put(key, entry); } } finally { @@ -860,197 +450,379 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public CompletableFuture> srandmemberAsync(String key, Type componentType, int count) { + public CompletableFuture> mgetAsync(final Type componentType, final String... keys) { return supplyFuture(() -> { List list = new ArrayList<>(); - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { - return list; - } - List vals = new ArrayList<>(entry.csetValue); - if (count < 0) { //可以重复 - for (int i = 0; i < Math.abs(count); i++) { - int index = ThreadLocalRandom.current().nextInt(vals.size()); - T val = vals.get(index); - list.add(val); - } - } else { //不可以重复 - if (count >= vals.size()) { - return vals; - } - return vals.subList(0, count); + for (String key : keys) { + list.add(get0(key, 0, componentType)); } return list; }); } + //----------- hxxx -------------- @Override - public CompletableFuture smoveAsync(String key, String key2, Type componentType, T member) { + public CompletableFuture existsAsync(String key) { + return supplyFuture(() -> find(key) != null); + } + + @Override + public CompletableFuture getAsync(final String key, final Type type) { + return supplyFuture(() -> get0(key, 0, type)); + } + + @Override + public CompletableFuture getexAsync(final String key, final int expireSeconds, final Type type) { + return supplyFuture(() -> get0(key, expireSeconds, type)); + } + + private T get0(final String key, final int expireSeconds, final Type type) { + CacheEntry entry = find(key); + if (entry == null) { + return null; + } + if (expireSeconds > 0) { + entry.expireSeconds(expireSeconds); + } + // OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP; + switch (entry.cacheType) { + case ATOMIC: + return formatValue(type, (AtomicLong) entry.objectValue); + case DOUBLE: + return formatValue(type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).intValue())); + case SSET: + return (T) new LinkedHashSet(entry.setValue); + case ZSET: + return (T) new LinkedHashSet(entry.setValue); + case LIST: + return (T) new ArrayList(entry.listValue); + case MAP: + return (T) new LinkedHashMap<>(entry.mapValue); + default: + Object obj = entry.objectValue; + if (obj != null && obj.getClass() != type) { + return (T) JsonConvert.root().convertFrom(type, JsonConvert.root().convertToBytes(obj)); + } + return (T) obj; + } + } + +// private void set0(CacheEntryType cacheType, String key, Object value) { +// if (key == null) { +// return; +// } +// CacheEntry entry = container.get(key); +// if (entry == null) { +// entry = new CacheEntry(cacheType, key, value, null, null, null); +// container.put(key, entry); +// } else { +// entry.expireSeconds = 0; +// entry.objectValue = value; +// entry.lastAccessed = System.currentTimeMillis(); +// } +// } +// +// private boolean setnx0(CacheEntryType cacheType, String key, Object value) { +// if (key == null) { +// return false; +// } +// CacheEntry entry = container.get(key); +// if (entry == null) { +// entry = new CacheEntry(cacheType, key, value, null, null, null); +// container.putIfAbsent(key, entry); +// return true; +// } else { +// entry.expireSeconds = 0; +// entry.lastAccessed = System.currentTimeMillis(); +// return false; +// } +// } + //------------------------ 哈希表 Hash ------------------------ + @Override + public CompletableFuture hdelAsync(final String key, String... fields) { return supplyFuture(() -> { - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { + long count = 0; + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return 0L; + } + Map map = entry.mapValue; + entry.lock(); + try { + for (String field : fields) { + if (map.remove(field) != null) { + count++; + } + } + } finally { + entry.unlock(); + } + return count; + }); + } + + @Override + public CompletableFuture> hkeysAsync(final String key) { + return supplyFuture(() -> { + List list = new ArrayList<>(); + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return list; + } + list.addAll(entry.mapValue.keySet()); + return list; + }); + } + + @Override + public CompletableFuture hlenAsync(final String key) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return 0L; + } + return (long) entry.mapValue.keySet().size(); + }); + } + + @Override + public CompletableFuture hincrAsync(final String key, String field) { + return hincrbyAsync(key, field, 1); + } + + private long hincrby0(final String key, String field, long num) { + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.MAP); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.MAP, key); + entry.mapValue = new ConcurrentHashMap(); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + Map map = entry.mapValue; + Serializable val = (Serializable) map.computeIfAbsent(field, f -> new AtomicLong()); + if (!(val instanceof AtomicLong)) { + val = new AtomicLong(((Number) val).longValue()); + map.put(field, val); + } + return ((AtomicLong) val).addAndGet(num); + } finally { + entry.unlock(); + } + } + + @Override + public CompletableFuture hincrbyAsync(final String key, String field, long num) { + return supplyFuture(() -> hincrby0(key, field, num)); + } + + @Override + public CompletableFuture hincrbyFloatAsync(final String key, String field, double num) { + return supplyFuture(() -> Double.longBitsToDouble(hincrby0(key, field, Double.doubleToLongBits(num)))); + } + + @Override + public CompletableFuture hdecrAsync(final String key, String field) { + return hincrbyAsync(key, field, -1); + } + + @Override + public CompletableFuture hdecrbyAsync(final String key, String field, long num) { + return hincrbyAsync(key, field, -num); + } + + @Override + public CompletableFuture hexistsAsync(final String key, String field) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.MAP); + return entry == null ? false : entry.mapValue.contains(field); + }); + } + + @Override + public CompletableFuture hsetAsync(final String key, final String field, final Convert convert, final Type type, final T value) { + return runFuture(() -> hset0(key, field, type, value)); + } + + @Override + public CompletableFuture hsetnxAsync(final String key, final String field, final Convert convert, final Type type, final T value) { + return supplyFuture(() -> { + if (value == null) { return false; } - boolean rs = entry.csetValue.remove(member); - if (rs) { - CacheEntry entry2 = container.get(key2); - if (entry2 == null || entry2.csetValue == null) { - appendSetItem(componentType == String.class ? CacheEntryType.SET_STRING : CacheEntryType.SET_OBJECT, key2, List.of(member)); - } else { - entry2.csetValue.add(member); - } - } - return rs; - }); - } - - @Override - public CompletableFuture> sdiffAsync(final String key, final Type componentType, final String... key2s) { - return supplyFuture(() -> { - Set rs = new HashSet<>(); - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { - return rs; - } - rs.addAll(entry.csetValue); - for (String k : key2s) { - CacheEntry en2 = container.get(k); - if (en2 != null && en2.csetValue != null) { - en2.csetValue.forEach(v -> rs.remove(v)); - } - } - return rs; - }); - } - - @Override - public CompletableFuture sdiffstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyFuture(() -> { - Set rs = sdiff(srcKey, Object.class, srcKey2s); - if (container.containsKey(key)) { - Set set = container.get(srcKey).csetValue; - set.clear(); - set.addAll(rs); - } else { - appendSetItem(CacheEntryType.SET_OBJECT, key, rs); - } - return (long) rs.size(); - }); - } - - @Override - public CompletableFuture> sinterAsync(final String key, final Type componentType, final String... key2s) { - return supplyFuture(() -> { - Set rs = new HashSet<>(); - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { - return rs; - } - rs.addAll(entry.csetValue); - for (String k : key2s) { - CacheEntry en2 = container.get(k); - if (en2 != null && en2.csetValue != null) { - Set removes = new HashSet<>(); - for (T v : rs) { - if (!en2.csetValue.contains(v)) { - removes.add(v); - } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.MAP); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.MAP, key); + container.put(key, entry); } - rs.removeAll(removes); - } else { - rs.clear(); - return rs; + } finally { + containerLock.unlock(); } } - return rs; - }); - } - - @Override - public CompletableFuture sinterstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyFuture(() -> { - Set rs = sinter(srcKey, Object.class, srcKey2s); - if (container.containsKey(key)) { - Set set = container.get(srcKey).csetValue; - set.clear(); - set.addAll(rs); - } else { - appendSetItem(CacheEntryType.SET_OBJECT, key, rs); - } - return (long) rs.size(); - }); - } - - @Override - public CompletableFuture> sunionAsync(final String key, final Type componentType, final String... key2s) { - return supplyFuture(() -> { - Set rs = new HashSet<>(); - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { + entry.lock(); + try { + boolean rs = entry.mapValue.putIfAbsent(field, formatValue(type, value)) == null; + entry.lastAccessed = System.currentTimeMillis(); return rs; + } finally { + entry.unlock(); } - rs.addAll(entry.csetValue); - for (String k : key2s) { - CacheEntry en2 = container.get(k); - if (en2 != null && en2.csetValue != null) { - rs.addAll(en2.csetValue); - } + }); + } + + @Override + public CompletableFuture hmsetAsync(final String key, final Serializable... values) { + return runFuture(() -> { + for (int i = 0; i < values.length; i += 2) { + hset0(key, (String) values[i], null, values[i + 1]); + } + }); + } + + @Override + public CompletableFuture hmsetAsync(final String key, final Map map) { + return runFuture(() -> { + map.forEach((k, v) -> hset0(key, (String) k, null, v)); + }); + } + + @Override + public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return null; + } + List rs = new ArrayList<>(fields.length); + for (String field : fields) { + rs.add(formatValue(type, entry.mapValue.get(field))); } return rs; }); } @Override - public CompletableFuture sunionstoreAsync(final String key, final String srcKey, final String... srcKey2s) { + public CompletableFuture> hgetallAsync(final String key, final Type type) { return supplyFuture(() -> { - Set rs = sunion(srcKey, Object.class, srcKey2s); - if (container.containsKey(key)) { - Set set = container.get(srcKey).csetValue; - set.clear(); - set.addAll(rs); + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return new LinkedHashMap(); } else { - appendSetItem(CacheEntryType.SET_OBJECT, key, rs); + Map map = new LinkedHashMap(); + entry.mapValue.forEach((k, v) -> { + map.put(k, formatValue(type, v)); + }); + return map; } - return (long) rs.size(); }); } @Override - public CompletableFuture> smembersAsync(final String key, final Type componentType) { - return getAsync(key, componentType); + public CompletableFuture> hvalsAsync(final String key, final Type type) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return new ArrayList(); + } else { + return new ArrayList(entry.mapValue.values().stream().map(v -> formatValue(type, v)).toList()); + } + }); } + @Override + public CompletableFuture> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { + return supplyFuture(() -> { + if (key == null) { + return new HashMap(); + } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return new HashMap(); + } + if (Utility.isEmpty(pattern)) { + return new LinkedHashMap(entry.mapValue); + } else { + Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); + Set> set = entry.mapValue.entrySet(); + return (Map) set.stream().filter(en -> regx.test(en.getKey())).collect(Collectors.toMap(en -> en.getKey(), en -> en.getValue())); + } + }); + } + + @Override + public CompletableFuture hgetAsync(final String key, final String field, final Type type) { + return supplyFuture(() -> { + if (key == null || field == null) { + return null; + } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return null; + } + Object obj = entry.mapValue.get(field); + return obj == null ? null : formatValue(type, obj); + }); + } + + @Override + public CompletableFuture hstrlenAsync(final String key, final String field) { + return supplyFuture(() -> { + if (key == null || field == null) { + return 0L; + } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + return 0L; + } + Object obj = entry.mapValue.get(field); + return obj == null ? 0L : (long) obj.toString().length(); + }); + } + + private void hset0(String key, String field, Type type, Object value) { + if (value == null) { + return; + } + CacheEntry entry = find(key, CacheEntryType.MAP); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.MAP); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.MAP, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + entry.mapValue.put(field, formatValue(type, value)); + entry.lastAccessed = System.currentTimeMillis(); + } finally { + entry.unlock(); + } + } + + //------------------------ 列表 List ------------------------ @Override public CompletableFuture> lrangeAsync(final String key, final Type componentType, int start, int stop) { return getAsync(key, componentType); } - @Override - public CompletableFuture>> smembersAsync(final Type componentType, final String... keys) { - return supplyFuture(() -> { - Map> map = new HashMap<>(); - for (String key : keys) { - Set s = (Set) get(key, componentType); - if (s != null) { - map.put(key, s); - } - } - return map; - }); - } - - @Override - public CompletableFuture> smismembersAsync(final String key, final String... members) { - return supplyFuture(() -> { - Set s = (Set) get(key, Object.class); - List rs = new ArrayList<>(); - for (String member : members) { - rs.add(s != null && s.contains(member)); - } - return rs; - }); - } - @Override public CompletableFuture>> lrangesAsync(final Type componentType, final String... keys) { return supplyFuture(() -> { @@ -1065,42 +837,24 @@ public final class CacheMemorySource extends AbstractCacheSource { }); } - @Override - public CompletableFuture> mgetAsync(final Type componentType, final String... keys) { - return supplyFuture(() -> { - List list = new ArrayList<>(); - for (String key : keys) { - Object v = get(key, componentType); - if (v != null) { - if (componentType == String.class) { - v = v.toString(); - } else if (componentType == long.class || componentType == Long.class) { - v = (Object) ((Number) v).longValue(); - } - } - list.add((T) v); - } - return list; - }); - } - @Override public CompletableFuture llenAsync(final String key) { return supplyFuture(() -> { - Collection collection = (Collection) get(key, Object.class); - return collection == null ? 0L : collection.size(); + CacheEntry entry = find(key, CacheEntryType.LIST); + return entry == null ? 0L : (long) entry.listValue.size(); }); } @Override public CompletableFuture lindexAsync(String key, Type componentType, int index) { return supplyFuture(() -> { - List list = (List) get(key, Object.class); - if (list == null || list.isEmpty()) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { return null; } + List list = new ArrayList(entry.listValue); int pos = index >= 0 ? index : list.size() + index; - return pos >= list.size() ? null : list.get(pos); + return pos >= list.size() ? null : (T) list.get(pos); }); } @@ -1116,8 +870,8 @@ public final class CacheMemorySource extends AbstractCacheSource { protected CompletableFuture linsertAsync(String key, Type componentType, boolean before, T pivot, T value) { return supplyFuture(() -> { - CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType() || entry.listValue == null) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { return 0L; } entry.lock(); @@ -1156,70 +910,29 @@ public final class CacheMemorySource extends AbstractCacheSource { }); } - @Override - public CompletableFuture saddAsync(final String key, final Type componentType, T... values) { - return runFuture(() -> { - appendSetItem(componentType == String.class ? CacheEntryType.SET_STRING : CacheEntryType.SET_OBJECT, key, List.of(values)); - }); - } - - @Override - public CompletableFuture scardAsync(final String key) { - return supplyFuture(() -> { - Collection collection = (Collection) get(key, Object.class); - return collection == null ? 0L : collection.size(); - }); - } - - @Override - public CompletableFuture sismemberAsync(final String key, final Type type, final T value) { - return supplyFuture(() -> { - Collection list = get(key, type); - return list != null && list.contains(value); - }); - } - - protected void appendListItem(CacheEntryType cacheType, String key, Object... values) { - appendListItem(cacheType, true, key, values); - } - - protected void appendListItem(CacheEntryType cacheType, boolean tail, String key, Object... values) { - if (key == null) { - return; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - ConcurrentLinkedDeque list = new ConcurrentLinkedDeque(); - entry = new CacheEntry(cacheType, key, null, null, list, null); - CacheEntry old = container.putIfAbsent(key, entry); - if (old != null) { - list = old.listValue; - } - if (list != null) { - if (tail) { - list.addAll(List.of(values)); - } else { - for (Object v : values) { - list.addFirst(v); - } - } - } - } else { - if (tail) { - entry.listValue.addAll(List.of(values)); - } else { - for (Object v : values) { - entry.listValue.addFirst(v); - } - } - } - } - @Override public CompletableFuture lpushAsync(final String key, final Type componentType, T... values) { return runFuture(() -> { - for (T value : values) { - appendListItem(CacheEntryType.LIST_OBJECT, false, key, value); + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.LIST); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.LIST, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + for (T val : values) { + entry.listValue.addFirst(val); + } + } finally { + entry.unlock(); } }); } @@ -1227,10 +940,18 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture lpushxAsync(final String key, final Type componentType, T... values) { return runFuture(() -> { - if (container.containsKey(key)) { - for (T value : values) { - appendListItem(CacheEntryType.LIST_OBJECT, false, key, value); + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return; + } + entry.lock(); + try { + ConcurrentLinkedDeque list = entry.listValue; + for (T val : values) { + list.addFirst(val); } + } finally { + entry.unlock(); } }); } @@ -1238,47 +959,42 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture lpopAsync(final String key, final Type componentType) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { return null; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - return null; + entry.lock(); + try { + return formatValue(componentType, entry.listValue.pollFirst()); + } finally { + entry.unlock(); } - if (entry.listValue.isEmpty()) { - return null; - } - Object obj = entry.listValue.pollFirst(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); - } - return (T) obj; }); } @Override public CompletableFuture ltrimAsync(final String key, int start, int stop) { return runFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { return; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - return; - } - if (entry.listValue.isEmpty()) { - return; - } - Iterator it = entry.listValue.iterator(); - int index = -1; - int end = stop >= 0 ? stop : entry.listValue.size() + stop; - while (it.hasNext()) { - ++index; - if (index > end) { - break; - } else if (index >= start) { - it.remove(); + entry.lock(); + try { + ConcurrentLinkedDeque list = entry.listValue; + Iterator it = list.iterator(); + int index = -1; + int end = stop >= 0 ? stop : list.size() + stop; + while (it.hasNext()) { + ++index; + if (index > end) { + break; + } else if (index >= start) { + it.remove(); + } } + } finally { + entry.unlock(); } }); } @@ -1295,31 +1011,34 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture rpopAsync(final String key, final Type componentType) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { return null; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - return null; + entry.lock(); + try { + return formatValue(componentType, entry.listValue.pollLast()); + } finally { + entry.unlock(); } - if (entry.listValue.isEmpty()) { - return null; - } - Object obj = entry.listValue.pollLast(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); - } - return (T) obj; }); } @Override public CompletableFuture rpushxAsync(final String key, final Type componentType, final T... values) { return runFuture(() -> { - if (container.containsKey(key)) { - for (T value : values) { - appendListItem(CacheEntryType.LIST_OBJECT, key, value); + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + return; + } + entry.lock(); + try { + ConcurrentLinkedDeque list = entry.listValue; + for (T val : values) { + list.add(val); } + } finally { + entry.unlock(); } }); } @@ -1327,131 +1046,458 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture rpushAsync(final String key, final Type componentType, final T... values) { return runFuture(() -> { - appendListItem(CacheEntryType.LIST_OBJECT, key, values); + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.LIST); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.LIST, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + ConcurrentLinkedDeque list = entry.listValue; + for (T val : values) { + list.add(val); + } + } finally { + entry.unlock(); + } }); } @Override public CompletableFuture lremAsync(final String key, final Type componentType, T value) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.LIST); + if (entry == null) { return 0L; } - CacheEntry entry = container.get(key); - if (entry == null || entry.listValue == null) { - return 0L; + entry.lock(); + try { + return entry.listValue.remove(value) ? 1L : 0L; + } finally { + entry.unlock(); } - return entry.listValue.remove(value) ? 1L : 0L; + }); + } + + //------------------------ 集合 Set ------------------------ + @Override + public CompletableFuture> srandmemberAsync(String key, Type componentType, int count) { + return supplyFuture(() -> { + List list = new ArrayList<>(); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return list; + } + List vals = new ArrayList<>(entry.setValue); + if (count < 0) { //可以重复 + for (int i = 0; i < Math.abs(count); i++) { + int index = ThreadLocalRandom.current().nextInt(vals.size()); + T val = vals.get(index); + list.add(val); + } + } else { //不可以重复 + if (count >= vals.size()) { + return vals; + } + return vals.subList(0, count); + } + return list; + }); + } + + @Override + public CompletableFuture smoveAsync(String key, String key2, Type componentType, T member) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return false; + } + boolean rs = false; + entry.lock(); + try { + rs = entry.setValue.remove(member); + } finally { + entry.unlock(); + } + if (rs) { + CacheEntry entry2 = find(key2, CacheEntryType.SSET); + if (entry2 == null) { + containerLock.lock(); + try { + entry2 = find(key2, CacheEntryType.SSET); + if (entry2 == null) { + entry2 = new CacheEntry(CacheEntryType.SSET, key); + container.put(key2, entry2); + } + } finally { + containerLock.unlock(); + } + } + entry2.lock(); + try { + entry2.setValue.add(member); + } finally { + entry2.unlock(); + } + } + return rs; + }); + } + + @Override + public CompletableFuture> sdiffAsync(final String key, final Type componentType, final String... key2s) { + return supplyFuture(() -> { + return sdiff0(key, key2s); + }); + } + + @Override + public CompletableFuture sdiffstoreAsync(final String key, final String srcKey, final String... srcKey2s) { + return supplyFuture(() -> { + Set rs = sdiff0(srcKey, srcKey2s); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.SSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.SSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + entry.setValue.clear(); + entry.setValue.addAll(rs); + } finally { + entry.unlock(); + } + return (long) rs.size(); + }); + } + + private Set sdiff0(final String key, final String... key2s) { + Set rs = new HashSet<>(); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return rs; + } + rs.addAll(entry.setValue); + for (String k : key2s) { + CacheEntry en2 = find(k, CacheEntryType.SSET); + if (en2 != null) { + en2.setValue.forEach(v -> rs.remove(v)); + } + } + return rs; + } + + @Override + public CompletableFuture> sinterAsync(final String key, final Type componentType, final String... key2s) { + return supplyFuture(() -> { + return sinter0(key, key2s); + }); + } + + @Override + public CompletableFuture sinterstoreAsync(final String key, final String srcKey, final String... srcKey2s) { + return supplyFuture(() -> { + Set rs = sinter0(srcKey, srcKey2s); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.SSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.SSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + entry.setValue.clear(); + entry.setValue.addAll(rs); + } finally { + entry.unlock(); + } + return (long) rs.size(); + }); + } + + private Set sinter0(final String key, final String... key2s) { + Set rs = new HashSet<>(); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return rs; + } + rs.addAll(entry.setValue); + for (String k : key2s) { + CacheEntry en2 = find(k, CacheEntryType.SSET); + if (en2 != null) { + Set removes = new HashSet<>(); + for (T v : rs) { + if (!en2.setValue.contains(v)) { + removes.add(v); + } + } + rs.removeAll(removes); + } else { + rs.clear(); + return rs; + } + } + return rs; + } + + @Override + public CompletableFuture> sunionAsync(final String key, final Type componentType, final String... key2s) { + return supplyFuture(() -> { + return sunion0(key, key2s); + }); + } + + @Override + public CompletableFuture sunionstoreAsync(final String key, final String srcKey, final String... srcKey2s) { + return supplyFuture(() -> { + Set rs = sunion0(srcKey, srcKey2s); + + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.SSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.SSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + entry.setValue.clear(); + entry.setValue.addAll(rs); + } finally { + entry.unlock(); + } + return (long) rs.size(); + }); + } + + private Set sunion0(final String key, final String... key2s) { + Set rs = new HashSet<>(); + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry != null) { + rs.addAll(entry.setValue); + } + for (String k : key2s) { + CacheEntry en2 = find(k, CacheEntryType.SSET); + if (en2 != null) { + rs.addAll(en2.setValue); + } + } + return rs; + } + + @Override + public CompletableFuture> smembersAsync(final String key, final Type componentType) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return new LinkedHashSet(); + } + return new LinkedHashSet<>(entry.setValue); + }); + } + + @Override + public CompletableFuture>> smembersAsync(final Type componentType, final String... keys) { + return supplyFuture(() -> { + Map> map = new HashMap<>(); + for (String key : keys) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry != null) { + map.put(key, new LinkedHashSet<>(entry.setValue)); + } + } + return map; + }); + } + + @Override + public CompletableFuture> smismembersAsync(final String key, final String... members) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.SSET); + List rs = new ArrayList<>(); + if (entry == null) { + for (String member : members) { + rs.add(false); + } + return rs; + } + Set set = entry.setValue; + for (String member : members) { + rs.add(set.contains(member)); + } + return rs; + }); + } + + @Override + public CompletableFuture saddAsync(final String key, final Type componentType, T... values) { + return runFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.SSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.SSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + Set set = entry.setValue; + for (T val : values) { + set.add(val); + } + } finally { + entry.unlock(); + } + }); + } + + @Override + public CompletableFuture scardAsync(final String key) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.SSET); + return entry == null ? 0L : (long) entry.setValue.size(); + }); + } + + @Override + public CompletableFuture sismemberAsync(final String key, final Type type, final T value) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.SSET); + return entry != null && entry.setValue.contains(value); }); } @Override public CompletableFuture spopAsync(final String key, final Type componentType) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { return null; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return null; - } - if (entry.csetValue.isEmpty()) { - return null; - } - Iterator it = entry.csetValue.iterator(); - Object del = null; - if (it.hasNext()) { - Object obj = it.next(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); + entry.lock(); + try { + final Set cset = entry.setValue; + if (cset.isEmpty()) { + return null; } - del = obj; + Iterator it = cset.iterator(); + Object del = null; + if (it.hasNext()) { + del = it.next(); + } + if (del != null) { + cset.remove(del); + return (T) del; + } + return null; + } finally { + entry.unlock(); } - if (del != null) { - entry.csetValue.remove(del); - return (T) del; - } - return null; }); } @Override public CompletableFuture> spopAsync(final String key, final int count, final Type componentType) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { return new LinkedHashSet<>(); } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return new LinkedHashSet<>(); - } - if (entry.csetValue.isEmpty()) { - return new LinkedHashSet<>(); - } - Iterator it = entry.csetValue.iterator(); - Set list = new LinkedHashSet<>(); - int index = 0; - while (it.hasNext()) { - Object obj = it.next(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); + entry.lock(); + try { + final Set cset = entry.setValue; + if (cset.isEmpty()) { + return new LinkedHashSet<>(); } - list.add((T) obj); - if (++index >= count) { - break; + Iterator it = cset.iterator(); + Set list = new LinkedHashSet<>(); + int index = 0; + while (it.hasNext()) { + list.add(formatValue(componentType, it.next())); + if (++index >= count) { + break; + } } + cset.removeAll(list); + return list; + } finally { + entry.unlock(); } - entry.csetValue.removeAll(list); - return list; }); } @Override public CompletableFuture> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { return supplyFuture(() -> { - if (key == null) { - return new LinkedHashSet(); - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { return new LinkedHashSet<>(); } - if (entry.csetValue.isEmpty()) { - return new LinkedHashSet<>(); - } - Iterator it = entry.csetValue.iterator(); - Set list = new LinkedHashSet<>(); - while (it.hasNext()) { - Object obj = it.next(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); + entry.lock(); + try { + final Set cset = entry.setValue; + if (cset.isEmpty()) { + return new LinkedHashSet<>(); } - list.add((T) obj); + Iterator it = cset.iterator(); + Set list = new LinkedHashSet<>(); + while (it.hasNext()) { + list.add((T) formatValue(componentType, it.next())); + } + return list; + } finally { + entry.unlock(); } - return list; }); } - protected void appendSetItem(CacheEntryType cacheType, String key, Collection values) { - if (key == null) { - return; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - Set set = cacheType == CacheEntryType.SET_SORTED ? new ConcurrentSkipListSet<>() : new CopyOnWriteArraySet(); - entry = new CacheEntry(cacheType, key, null, set, null, null); - CacheEntry old = container.putIfAbsent(key, entry); - if (old != null) { - set = old.csetValue; + @Override + public CompletableFuture sremAsync(String key, Type type, T... values) { + return supplyFuture(() -> { + CacheEntry entry = find(key, CacheEntryType.SSET); + if (entry == null) { + return 0L; } - if (set != null) { - set.addAll(values); - } - } else { - entry.csetValue.addAll(values); - } + return entry.setValue.removeAll(Arrays.asList(values)) ? 1L : 0L; + }); } + //------------------------ 有序集合 Sorted Set ------------------------ @Override public CompletableFuture zaddAsync(String key, CacheScoredValue... values) { return runFuture(() -> { @@ -1459,29 +1505,47 @@ public final class CacheMemorySource extends AbstractCacheSource { for (CacheScoredValue v : values) { list.add(new CacheScoredValue.NumberScoredValue(v)); } - appendSetItem(CacheEntryType.SET_SORTED, key, list); + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + containerLock.lock(); + try { + entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.ZSET, key); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + entry.lock(); + try { + entry.setValue.addAll(list); + } finally { + entry.unlock(); + } }); } @Override public CompletableFuture zincrbyAsync(String key, CacheScoredValue value) { return supplyFuture(() -> { - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.csetValue == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { containerLock.lock(); try { - entry = container.get(key); - if (entry == null || entry.isExpired()) { - appendSetItem(CacheEntryType.SET_SORTED, key, List.of(new CacheScoredValue.NumberScoredValue(value.getScore().doubleValue(), value.getValue()))); + entry = find(key, CacheEntryType.ZSET); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.ZSET, key); + container.put(key, entry); } } finally { containerLock.unlock(); } - return (T) value.getScore(); } - entry.lock.lock(); + entry.lock(); try { - Set sets = entry.csetValue; + Set sets = entry.setValue; CacheScoredValue.NumberScoredValue old = sets.stream().filter(v -> Objects.equals(v.getValue(), value.getValue())).findAny().orElse(null); if (old == null) { sets.add(new CacheScoredValue.NumberScoredValue(value.getScore().doubleValue(), value.getValue())); @@ -1504,7 +1568,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return (T) old.getScore(); } } finally { - entry.lock.unlock(); + entry.unlock(); } }); } @@ -1512,28 +1576,22 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture zcardAsync(String key) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { return 0L; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return 0L; - } - return (long) entry.csetValue.size(); + return (long) entry.setValue.size(); }); } @Override public CompletableFuture zrankAsync(String key, String member) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { return null; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return null; - } - List list = new ArrayList<>(entry.csetValue); + List list = new ArrayList<>(entry.setValue); Collections.sort(list); long c = 0; for (CacheScoredValue.NumberScoredValue v : list) { @@ -1549,14 +1607,11 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture zrevrankAsync(String key, String member) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { return null; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return null; - } - List list = new ArrayList<>(entry.csetValue); + List list = new ArrayList<>(entry.setValue); Collections.sort(list, Collections.reverseOrder()); long c = 0; for (CacheScoredValue.NumberScoredValue v : list) { @@ -1572,15 +1627,12 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture> zrangeAsync(String key, int start, int stop) { return supplyFuture(() -> { - if (key == null) { - return new ArrayList<>(); - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { return new ArrayList<>(); } List list = new ArrayList<>(); - Set sets = entry.csetValue; + Set sets = entry.setValue; long c = 0; for (CacheScoredValue v : sets) { if (c >= start && (stop < 0 || c <= stop)) { @@ -1595,14 +1647,11 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture> zscanAsync(String key, Type scoreType, AtomicLong cursor, int limit, String pattern) { return supplyFuture(() -> { - if (key == null) { - return new ArrayList<>(); - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.csetValue == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { return new ArrayList(); } - Set sets = entry.csetValue; + Set sets = entry.setValue; if (Utility.isEmpty(pattern)) { return sets.stream().collect(Collectors.toList()); } else { @@ -1615,14 +1664,11 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture zremAsync(String key, String... members) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { return 0L; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return 0L; - } - Set sets = entry.csetValue; + Set sets = entry.setValue; long c = 0; Set keys = Set.of(members); Iterator it = sets.iterator(); @@ -1643,21 +1689,15 @@ public final class CacheMemorySource extends AbstractCacheSource { public CompletableFuture> zmscoreAsync(String key, Class scoreType, String... members) { return supplyFuture(() -> { List list = new ArrayList<>(); - if (key == null) { - for (int i = 0; i < members.length; i++) { - list.add(null); - } - return list; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { for (int i = 0; i < members.length; i++) { list.add(null); } return list; } Set keys = Set.of(members); - Set sets = entry.csetValue; + Set sets = entry.setValue; Map map = new HashMap<>(); sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> { map.put(v.getValue(), formatScore(scoreType, v.getScore())); @@ -1689,30 +1729,35 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture zscoreAsync(String key, Class scoreType, String member) { return supplyFuture(() -> { - if (key == null) { + CacheEntry entry = find(key, CacheEntryType.ZSET); + if (entry == null) { return null; } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return null; - } - Set sets = entry.csetValue; + Set sets = entry.setValue; return formatScore(scoreType, sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null)); }); } - @Override - public CompletableFuture sremAsync(String key, Type type, T... values) { - return supplyFuture(() -> { - if (key == null) { - return 0L; + private T formatValue(@Nullable Type componentType, Object obj) { + if (componentType == null || obj == null || componentType == obj.getClass()) { + return (T) obj; + } + if (componentType == String.class) { + obj = obj.toString(); + } else if (componentType == long.class || componentType == Long.class) { + if (obj instanceof Number) { + obj = ((Number) obj).longValue(); + } else { + obj = Long.parseLong(obj.toString()); } - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { - return 0L; + } else if (componentType == int.class || componentType == Integer.class) { + if (obj instanceof Number) { + obj = ((Number) obj).intValue(); + } else { + obj = Integer.parseInt(obj.toString()); } - return entry.csetValue.removeAll(List.of(values)) ? 1L : 0L; - }); + } + return (T) obj; } @Override @@ -1739,107 +1784,108 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture> keysAsync(String pattern) { return supplyFuture(() -> { - if (pattern == null || pattern.isEmpty()) { - return new ArrayList<>(container.keySet()); - } else { - List rs = new ArrayList<>(); - Predicate filter = Pattern.compile(pattern).asPredicate(); - container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); - return rs; - } + List rs = new ArrayList<>(); + Predicate filter = isEmpty(pattern) ? x -> true : Pattern.compile(pattern).asPredicate(); + container.forEach((k, v) -> { + if (filter.test(k) && !v.isExpired()) { + rs.add(k); + } + }); + return rs; }); } @Override public CompletableFuture> scanAsync(AtomicLong cursor, int limit, String pattern) { - return supplyFuture(() -> { - if (pattern == null || pattern.isEmpty()) { - return new ArrayList<>(container.keySet()); - } else { - List rs = new ArrayList<>(); - Predicate filter = Pattern.compile(pattern).asPredicate(); - container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); - return rs; - } - }); + return keysAsync(pattern); } @Override public CompletableFuture> keysStartsWithAsync(String startsWith) { - if (startsWith == null) { - return keysAsync(); - } return supplyFuture(() -> { List rs = new ArrayList<>(); - container.keySet().stream().filter(x -> x.startsWith(startsWith)).forEach(x -> rs.add(x)); + Predicate filter = isEmpty(startsWith) ? x -> true : x -> x.startsWith(startsWith); + container.forEach((k, v) -> { + if (filter.test(k) && !v.isExpired()) { + rs.add(k); + } + }); return rs; }); } - public static enum CacheNodeType { - LONG, STRING, OBJECT, - ATOMIC, DOUBLE, - SET_SORTED, - SET_OBJECT, - MAP; + protected CacheEntry find(String key) { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired()) { + return null; + } + entry.lastAccessed = System.currentTimeMillis(); + return entry; + } + + protected CacheEntry find(String key, CacheEntryType cacheType) { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired()) { + return null; + } + if (entry.cacheType != cacheType) { + throw new SourceException(key + " value is " + entry.cacheType + " type but need " + cacheType); + } + entry.lastAccessed = System.currentTimeMillis(); + return entry; } protected CacheEntryType findEntryType(Type type) { - if (type == String.class) { - return CacheEntryType.STRING; - } else if (type == long.class || type == Long.class) { - return CacheEntryType.LONG; - } else if (type == byte[].class) { - return CacheEntryType.BYTES; - } return CacheEntryType.OBJECT; } public static enum CacheEntryType { - LONG, STRING, OBJECT, BYTES, ATOMIC, MAP, DOUBLE, - SET_LONG, SET_STRING, SET_OBJECT, SET_SORTED, - LIST_LONG, LIST_STRING, LIST_OBJECT; + OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP; } - public static final class CacheEntry { - - final CacheEntryType cacheType; - - String key; - - //<=0表示永久保存 - int expireSeconds; + public static final class CacheEntry { volatile long lastAccessed; //最后刷新时间 - T objectValue; + Object objectValue; - ConcurrentHashMap mapValue; + Set setValue; + + ConcurrentLinkedDeque listValue; + + ConcurrentHashMap mapValue; + + private CacheEntryType cacheType; + + private String key; + + private int expireMills; //<=0表示永久保存 private final ReentrantLock lock = new ReentrantLock(); - Set csetValue; - - ConcurrentLinkedDeque listValue; - - public CacheEntry(CacheEntryType cacheType, String key, T objectValue, Set csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { - this(cacheType, 0, key, objectValue, csetValue, listValue, mapValue); - } - - public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, Set csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { - this(cacheType, expireSeconds, System.currentTimeMillis(), key, objectValue, csetValue, listValue, mapValue); - } - - @ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue", "mapValue"}) - public CacheEntry(CacheEntryType cacheType, int expireSeconds, long lastAccessed, String key, T objectValue, Set csetValue, ConcurrentLinkedDeque listValue, ConcurrentHashMap mapValue) { + public CacheEntry(CacheEntryType cacheType, String key) { this.cacheType = cacheType; - this.expireSeconds = expireSeconds; - this.lastAccessed = lastAccessed; this.key = key; - this.objectValue = objectValue; - this.csetValue = csetValue; - this.listValue = listValue; - this.mapValue = mapValue; + if (cacheType == CacheEntryType.SSET) { + this.setValue = new CopyOnWriteArraySet(); + } else if (cacheType == CacheEntryType.ZSET) { + this.setValue = new ConcurrentSkipListSet(); + } else if (cacheType == CacheEntryType.LIST) { + this.listValue = new ConcurrentLinkedDeque(); + } else if (cacheType == CacheEntryType.MAP) { + this.mapValue = new ConcurrentHashMap(); + } + } + + public CacheEntry expireSeconds(int expireSeconds) { + this.expireMills = expireSeconds > 0 ? expireSeconds * 1000 : 0; + return this; } @Override @@ -1847,24 +1893,23 @@ public final class CacheMemorySource extends AbstractCacheSource { return JsonFactory.root().getConvert().convertTo(this); } - @ConvertColumn(ignore = true) - public boolean isListCacheType() { - return cacheType == CacheEntryType.LIST_LONG || cacheType == CacheEntryType.LIST_STRING || cacheType == CacheEntryType.LIST_OBJECT; - } - - @ConvertColumn(ignore = true) - public boolean isSetCacheType() { - return cacheType == CacheEntryType.SET_LONG || cacheType == CacheEntryType.SET_STRING || cacheType == CacheEntryType.SET_OBJECT || cacheType == CacheEntryType.SET_SORTED; - } - - @ConvertColumn(ignore = true) - public boolean isMapCacheType() { - return cacheType == CacheEntryType.MAP; - } - +// @ConvertColumn(ignore = true) +// public boolean isListCacheType() { +// return cacheType == CacheEntryType.LIST; +// } +// +// @ConvertColumn(ignore = true) +// public boolean isSetCacheType() { +// return cacheType == CacheEntryType.SSET || cacheType == CacheEntryType.ZSET; +// } +// +// @ConvertColumn(ignore = true) +// public boolean isMapCacheType() { +// return cacheType == CacheEntryType.MAP; +// } @ConvertColumn(ignore = true) public boolean isExpired() { - return expireSeconds > 0 && (lastAccessed + expireSeconds * 1000) < System.currentTimeMillis(); + return expireMills > 0 && (lastAccessed + expireMills) < System.currentTimeMillis(); } public void lock() { @@ -1879,8 +1924,8 @@ public final class CacheMemorySource extends AbstractCacheSource { return cacheType; } - public int getExpireSeconds() { - return expireSeconds; + public int getExpireMills() { + return expireMills; } public long getLastAccessed() { @@ -1891,15 +1936,15 @@ public final class CacheMemorySource extends AbstractCacheSource { return key; } - public T getObjectValue() { + public Object getObjectValue() { return objectValue; } - public Set getCsetValue() { - return csetValue; + public Set getSetValue() { + return setValue; } - public ConcurrentLinkedDeque getListValue() { + public ConcurrentLinkedDeque getListValue() { return listValue; }