From 337a2a3038d2e324775a3c48cbdcffcd30e44842 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Tue, 14 Nov 2017 20:35:31 +0800 Subject: [PATCH] =?UTF-8?q?CacheSource=E5=A2=9E=E5=8A=A0getString/getLong?= =?UTF-8?q?=E4=B8=80=E7=B3=BB=E5=88=97=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/convert/ConvertFactory.java | 2 + .../convert/ext/AtomicLongSimpledCoder.java | 35 ++ src/org/redkale/source/CacheMemorySource.java | 453 ++++++++++++++++-- src/org/redkale/source/CacheSource.java | 112 ++++- 4 files changed, 548 insertions(+), 54 deletions(-) create mode 100644 src/org/redkale/convert/ext/AtomicLongSimpledCoder.java diff --git a/src/org/redkale/convert/ConvertFactory.java b/src/org/redkale/convert/ConvertFactory.java index 3800c2267..52f6d75a7 100644 --- a/src/org/redkale/convert/ConvertFactory.java +++ b/src/org/redkale/convert/ConvertFactory.java @@ -13,6 +13,7 @@ import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.stream.Stream; import org.redkale.convert.ext.InetAddressSimpledCoder.InetSocketAddressSimpledCoder; @@ -88,6 +89,7 @@ public abstract class ConvertFactory { this.register(String.class, StringSimpledCoder.instance); this.register(CharSequence.class, CharSequenceSimpledCoder.instance); this.register(java.util.Date.class, DateSimpledCoder.instance); + this.register(AtomicLong.class, AtomicLongSimpledCoder.instance); this.register(BigInteger.class, BigIntegerSimpledCoder.instance); this.register(BigDecimal.class, BigDecimalSimpledCoder.instance); this.register(InetAddress.class, InetAddressSimpledCoder.instance); diff --git a/src/org/redkale/convert/ext/AtomicLongSimpledCoder.java b/src/org/redkale/convert/ext/AtomicLongSimpledCoder.java new file mode 100644 index 000000000..914b81262 --- /dev/null +++ b/src/org/redkale/convert/ext/AtomicLongSimpledCoder.java @@ -0,0 +1,35 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.convert.ext; + +import java.util.concurrent.atomic.AtomicLong; +import org.redkale.convert.*; + +/** + * AtomicLong 的SimpledCoder实现 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param Reader输入的子类型 + * @param Writer输出的子类型 + */ +public final class AtomicLongSimpledCoder extends SimpledCoder { + + public static final AtomicLongSimpledCoder instance = new AtomicLongSimpledCoder(); + + @Override + public void convertTo(W out, AtomicLong value) { + out.writeLong(value == null ? 0 : value.get()); + } + + @Override + public AtomicLong convertFrom(R in) { + return new AtomicLong(in.readLong()); + } + +} diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index d6c089ead..a01335ebf 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -32,6 +32,15 @@ import org.redkale.util.*; @ResourceType(CacheSource.class) public class CacheMemorySource extends AbstractService implements CacheSource, Service, AutoCloseable, Resourcable { + private static final Type STRING_ENTRY_TYPE = new TypeToken>() { + }.getType(); + + private static final Type LONG_ENTRY_TYPE = new TypeToken>() { + }.getType(); + + private static final Type ATOMIC_ENTRY_TYPE = new TypeToken>() { + }.getType(); + @Resource(name = "APP_HOME") private File home; @@ -45,10 +54,6 @@ public class CacheMemorySource extends AbstractService impleme private Type objValueType; - private Type setValueType; - - private Type listValueType; - private ScheduledThreadPoolExecutor scheduler; private Consumer expireHandler; @@ -66,8 +71,6 @@ public class CacheMemorySource extends AbstractService impleme @Override public final void initValueType(Type valueType) { this.objValueType = valueType; - this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, valueType); - this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, valueType); this.initTransient(this.objValueType == null); } @@ -84,6 +87,7 @@ public class CacheMemorySource extends AbstractService impleme @Override public void init(AnyValue conf) { if (this.convert == null) this.convert = this.defaultConvert; + if (this.convert == null) this.convert = JsonConvert.root(); final CacheMemorySource self = this; AnyValue prop = conf == null ? null : conf.getAnyValue("properties"); if (prop != null) { @@ -137,18 +141,21 @@ public class CacheMemorySource extends AbstractService impleme File store = new File(home, "cache/" + resourceName()); if (!store.isFile() || !store.canRead()) return; LineNumberReader reader = new LineNumberReader(new FileReader(store)); - if (this.objValueType == null) { - this.objValueType = Object.class; - this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, this.objValueType); - this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, this.objValueType); - } + if (this.objValueType == null) this.objValueType = Object.class; final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType); - final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, setValueType); - final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, listValueType); + String line; while ((line = reader.readLine()) != null) { if (line.isEmpty()) continue; - CacheEntry entry = convert.convertFrom(line.startsWith(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.startsWith(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line); + Type convertType = storeObjType; + if (line.startsWith("{\"cacheType\":\"" + CacheEntryType.LONG)) { + convertType = LONG_ENTRY_TYPE; + } else if (line.startsWith("{\"cacheType\":\"" + CacheEntryType.STRING)) { + convertType = STRING_ENTRY_TYPE; + } else if (line.startsWith("{\"cacheType\":\"" + CacheEntryType.ATOMIC)) { + convertType = ATOMIC_ENTRY_TYPE; + } + CacheEntry entry = convert.convertFrom(convertType, line); if (entry.isExpired()) continue; if (datasync && container.containsKey(entry.key)) continue; //已经同步了 container.put(entry.key, entry); @@ -182,6 +189,54 @@ public class CacheMemorySource extends AbstractService impleme } } + public static void main(String[] args) throws Exception { + AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue(); + conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "127.0.0.1").addValue("port", "6379")); + + CacheMemorySource source = new CacheMemorySource(); + source.defaultConvert = JsonFactory.root().getConvert(); + source.initValueType(String.class); //value用String类型 + source.init(conf); + + System.out.println("------------------------------------"); + source.remove("key1"); + source.remove("key2"); + source.remove("300"); + source.set("key1", "value1"); + source.set("300", "4000"); + source.getAndRefresh("key1", 3500); + System.out.println("[有值] 300 GET : " + source.get("300")); + System.out.println("[有值] key1 GET : " + source.get("key1")); + System.out.println("[无值] key2 GET : " + source.get("key2")); + System.out.println("[有值] key1 EXISTS : " + source.exists("key1")); + System.out.println("[无值] key2 EXISTS : " + source.exists("key2")); + + source.remove("keys3"); + source.appendListItem("keys3", "vals1"); + source.appendListItem("keys3", "vals2"); + System.out.println("-------- keys3 追加了两个值 --------"); + System.out.println("[两值] keys3 VALUES : " + source.getCollection("keys3")); + System.out.println("[有值] keys3 EXISTS : " + source.exists("keys3")); + source.removeListItem("keys3", "vals1"); + System.out.println("[一值] keys3 VALUES : " + source.getCollection("keys3")); + source.getCollectionAndRefresh("keys3", 3000); + + source.remove("sets3"); + source.appendSetItem("sets3", "setvals1"); + source.appendSetItem("sets3", "setvals2"); + source.appendSetItem("sets3", "setvals1"); + System.out.println("[两值] sets3 VALUES : " + source.getCollection("sets3")); + System.out.println("[有值] sets3 EXISTS : " + source.exists("sets3")); + source.removeSetItem("sets3", "setvals1"); + System.out.println("[一值] sets3 VALUES : " + source.getCollection("sets3")); + System.out.println("sets3 大小 : " + source.getCollectionSize("sets3")); + System.out.println("all keys: " + source.queryKeys()); + System.out.println("newnum 值 : " + source.incr("newnum")); + System.out.println("newnum 值 : " + source.decr("newnum")); + System.out.println("------------------------------------"); + source.destroy(null); + } + @Override public void close() throws Exception { //给Application 关闭时调用 destroy(null); @@ -202,11 +257,38 @@ public class CacheMemorySource extends AbstractService impleme store.getParentFile().mkdirs(); PrintStream stream = new PrintStream(store, "UTF-8"); final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType); - final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, setValueType); - final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, listValueType); + final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType); + final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType); Collection> entrys = container.values(); for (CacheEntry entry : entrys) { - stream.println(convert.convertTo(entry.isSetCacheType() ? storeSetType : (entry.isListCacheType() ? storeListType : storeObjType), entry)); + Type convertType = storeObjType; + if (entry.cacheType == CacheEntryType.LONG) { + convertType = LONG_ENTRY_TYPE; + } else if (entry.cacheType == CacheEntryType.STRING) { + convertType = STRING_ENTRY_TYPE; + } else if (entry.cacheType == CacheEntryType.ATOMIC) { + convertType = ATOMIC_ENTRY_TYPE; + } else if (entry.cacheType == CacheEntryType.OBJECT) { + convertType = storeObjType; + } else if (entry.cacheType == CacheEntryType.LONG_LIST) { + convertType = LONG_ENTRY_TYPE; + } else if (entry.cacheType == CacheEntryType.LONG_SET) { + convertType = LONG_ENTRY_TYPE; + } else if (entry.cacheType == CacheEntryType.STRING_LIST) { + convertType = STRING_ENTRY_TYPE; + } else if (entry.cacheType == CacheEntryType.STRING_SET) { + convertType = STRING_ENTRY_TYPE; + } else if (entry.cacheType == CacheEntryType.OBJECT_LIST) { + convertType = storeListType; + } else if (entry.cacheType == CacheEntryType.OBJECT_SET) { + convertType = storeSetType; + } + try { + stream.println(convert.convertTo(convertType, entry)); + } catch (Exception ee) { + System.err.println(storeSetType + "-----" + entry); + throw ee; + } } container.clear(); stream.close(); @@ -234,15 +316,41 @@ public class CacheMemorySource extends AbstractService impleme CacheEntry entry = container.get(key); if (entry == null || entry.isExpired()) return null; if (entry.isListCacheType()) return (V) (entry.listValue == null ? null : new ArrayList(entry.listValue)); - if (entry.isSetCacheType()) return (V) (entry.setValue == null ? null : new HashSet(entry.setValue)); + if (entry.isSetCacheType()) return (V) (entry.csetValue == null ? null : new HashSet(entry.csetValue)); return (V) entry.objectValue; } + @Override + public String getString(String key) { + if (key == null) return null; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired()) return null; + return (String) entry.objectValue; + } + + @Override + public long getLong(String key, long defValue) { + if (key == null) return defValue; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired()) return defValue; + return entry.objectValue == null ? defValue : (entry.objectValue instanceof AtomicLong ? ((AtomicLong) entry.objectValue).get() : (Long) entry.objectValue); + } + @Override public CompletableFuture getAsync(final String key) { return CompletableFuture.supplyAsync(() -> get(key), getExecutor()); } + @Override + public CompletableFuture getStringAsync(final String key) { + return CompletableFuture.supplyAsync(() -> getString(key), getExecutor()); + } + + @Override + public CompletableFuture getLongAsync(final String key, long defValue) { + return CompletableFuture.supplyAsync(() -> getLong(key, defValue), getExecutor()); + } + @Override @RpcMultiRun public V getAndRefresh(String key, final int expireSeconds) { @@ -252,15 +360,51 @@ public class CacheMemorySource extends AbstractService impleme entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); entry.expireSeconds = expireSeconds; if (entry.isListCacheType()) return (V) (entry.listValue == null ? null : new ArrayList(entry.listValue)); - if (entry.isSetCacheType()) return (V) (entry.setValue == null ? null : new HashSet(entry.setValue)); + if (entry.isSetCacheType()) return (V) (entry.csetValue == null ? null : new HashSet(entry.csetValue)); return (V) entry.objectValue; } @Override + @RpcMultiRun + public String getStringAndRefresh(String key, final int expireSeconds) { + if (key == null) return null; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired()) return null; + entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.expireSeconds = expireSeconds; + return (String) entry.objectValue; + } + + @Override + @RpcMultiRun + public long getLongAndRefresh(String key, final int expireSeconds, long defValue) { + if (key == null) return defValue; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired()) return defValue; + entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.expireSeconds = expireSeconds; + return entry.objectValue == null ? defValue : (entry.objectValue instanceof AtomicLong ? ((AtomicLong) entry.objectValue).get() : (Long) entry.objectValue); + + } + + @Override + @RpcMultiRun public CompletableFuture getAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds), getExecutor()); } + @Override + @RpcMultiRun + public CompletableFuture getStringAndRefreshAsync(final String key, final int expireSeconds) { + return CompletableFuture.supplyAsync(() -> getStringAndRefresh(key, expireSeconds), getExecutor()); + } + + @Override + @RpcMultiRun + public CompletableFuture getLongAndRefreshAsync(final String key, final int expireSeconds, long defValue) { + return CompletableFuture.supplyAsync(() -> getLongAndRefresh(key, expireSeconds, defValue), getExecutor()); + } + @Override @RpcMultiRun public void refresh(String key, final int expireSeconds) { @@ -272,17 +416,16 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun public CompletableFuture refreshAsync(final String key, final int expireSeconds) { return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()); } - @Override - @RpcMultiRun - public void set(String key, V value) { + protected void set(CacheEntryType cacheType, String key, Object value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) { - entry = new CacheEntry(CacheEntryType.OBJECT, key, value, null, null); + entry = new CacheEntry(cacheType, key, value, null, null); container.putIfAbsent(key, entry); } else { entry.expireSeconds = 0; @@ -292,17 +435,46 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun + public void set(String key, V value) { + set(CacheEntryType.OBJECT, key, value); + } + + @Override + @RpcMultiRun + public void setString(String key, String value) { + set(CacheEntryType.STRING, key, value); + } + + @Override + @RpcMultiRun + public void setLong(String key, long value) { + set(CacheEntryType.LONG, key, value); + } + + @Override + @RpcMultiRun public CompletableFuture setAsync(String key, V value) { return CompletableFuture.runAsync(() -> set(key, value), getExecutor()); } @Override @RpcMultiRun - public void set(int expireSeconds, String key, V value) { + public CompletableFuture setStringAsync(String key, String value) { + return CompletableFuture.runAsync(() -> setString(key, value), getExecutor()); + } + + @Override + @RpcMultiRun + public CompletableFuture setLongAsync(String key, long value) { + return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor()); + } + + protected void set(CacheEntryType cacheType, int expireSeconds, String key, Object value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) { - entry = new CacheEntry(CacheEntryType.OBJECT, expireSeconds, key, value, null, null); + entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null); container.putIfAbsent(key, entry); } else { if (expireSeconds > 0) entry.expireSeconds = expireSeconds; @@ -312,10 +484,41 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun + public void set(int expireSeconds, String key, V value) { + set(CacheEntryType.OBJECT, expireSeconds, key, value); + } + + @Override + @RpcMultiRun + public void setString(int expireSeconds, String key, String value) { + set(CacheEntryType.STRING, expireSeconds, key, value); + } + + @Override + @RpcMultiRun + public void setLong(int expireSeconds, String key, long value) { + set(CacheEntryType.LONG, expireSeconds, key, value); + } + + @Override + @RpcMultiRun public CompletableFuture setAsync(int expireSeconds, String key, V value) { return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()); } + @Override + @RpcMultiRun + public CompletableFuture setStringAsync(int expireSeconds, String key, String value) { + return CompletableFuture.runAsync(() -> setString(expireSeconds, key, value), getExecutor()); + } + + @Override + @RpcMultiRun + public CompletableFuture setLongAsync(int expireSeconds, String key, long value) { + return CompletableFuture.runAsync(() -> setLong(expireSeconds, key, value), getExecutor()); + } + @Override @RpcMultiRun public void setExpireSeconds(String key, int expireSeconds) { @@ -326,6 +529,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun public CompletableFuture setExpireSecondsAsync(final String key, final int expireSeconds) { return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor()); } @@ -338,23 +542,26 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun public long incr(final String key) { return incr(key, 1); } @Override + @RpcMultiRun public CompletableFuture incrAsync(final String key) { return CompletableFuture.supplyAsync(() -> incr(key), getExecutor()); } @Override + @RpcMultiRun public long incr(final String key, long num) { CacheEntry entry = container.get(key); if (entry == null) { synchronized (container) { entry = container.get(key); if (entry == null) { - entry = new CacheEntry(CacheEntryType.OBJECT, key, new AtomicLong(), null, null); + entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null); container.put(key, entry); } } @@ -363,31 +570,37 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun public CompletableFuture incrAsync(final String key, long num) { return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor()); } @Override + @RpcMultiRun public long decr(final String key) { return incr(key, -1); } @Override + @RpcMultiRun public CompletableFuture decrAsync(final String key) { return CompletableFuture.supplyAsync(() -> decr(key), getExecutor()); } @Override + @RpcMultiRun public long decr(final String key, long num) { return incr(key, -num); } @Override + @RpcMultiRun public CompletableFuture decrAsync(final String key, long num) { return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor()); } @Override + @RpcMultiRun public CompletableFuture removeAsync(final String key) { return CompletableFuture.runAsync(() -> remove(key), getExecutor()); } @@ -397,11 +610,31 @@ public class CacheMemorySource extends AbstractService impleme return (Collection) get(key); } + @Override + public Collection getStringCollection(final String key) { + return (Collection) get(key); + } + + @Override + public Collection getLongCollection(final String key) { + return (Collection) get(key); + } + @Override public CompletableFuture> getCollectionAsync(final String key) { return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor()); } + @Override + public CompletableFuture> getStringCollectionAsync(final String key) { + return CompletableFuture.supplyAsync(() -> getStringCollection(key), getExecutor()); + } + + @Override + public CompletableFuture> getLongCollectionAsync(final String key) { + return CompletableFuture.supplyAsync(() -> getLongCollection(key), getExecutor()); + } + @Override public int getCollectionSize(final String key) { Collection collection = (Collection) get(key); @@ -414,23 +647,47 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun public Collection getCollectionAndRefresh(final String key, final int expireSeconds) { return (Collection) getAndRefresh(key, expireSeconds); } @Override + @RpcMultiRun + public Collection getStringCollectionAndRefresh(final String key, final int expireSeconds) { + return (Collection) getAndRefresh(key, expireSeconds); + } + + @Override + @RpcMultiRun + public Collection getLongCollectionAndRefresh(final String key, final int expireSeconds) { + return (Collection) getAndRefresh(key, expireSeconds); + } + + @Override + @RpcMultiRun public CompletableFuture> getCollectionAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds), getExecutor()); } @Override @RpcMultiRun - public void appendListItem(String key, V value) { + public CompletableFuture> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds) { + return CompletableFuture.supplyAsync(() -> getStringCollectionAndRefresh(key, expireSeconds), getExecutor()); + } + + @Override + @RpcMultiRun + public CompletableFuture> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds) { + return CompletableFuture.supplyAsync(() -> getLongCollectionAndRefresh(key, expireSeconds), getExecutor()); + } + + protected void appendListItem(CacheEntryType cacheType, String key, Object value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); - entry = new CacheEntry(CacheEntryType.LIST, key, null, null, list); + ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); + entry = new CacheEntry(cacheType, key, null, null, list); CacheEntry old = container.putIfAbsent(key, entry); if (old != null) list = old.listValue; if (list != null) list.add(value); @@ -440,10 +697,41 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun + public void appendListItem(String key, V value) { + appendListItem(CacheEntryType.OBJECT_LIST, key, value); + } + + @Override + @RpcMultiRun + public void appendStringListItem(String key, String value) { + appendListItem(CacheEntryType.STRING_LIST, key, value); + } + + @Override + @RpcMultiRun + public void appendLongListItem(String key, long value) { + appendListItem(CacheEntryType.LONG_LIST, key, value); + } + + @Override + @RpcMultiRun public CompletableFuture appendListItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()); } + @Override + @RpcMultiRun + public CompletableFuture appendStringListItemAsync(final String key, final String value) { + return CompletableFuture.runAsync(() -> appendStringListItem(key, value), getExecutor()); + } + + @Override + @RpcMultiRun + public CompletableFuture appendLongListItemAsync(final String key, final long value) { + return CompletableFuture.runAsync(() -> appendLongListItem(key, value), getExecutor()); + } + @Override @RpcMultiRun public void removeListItem(String key, V value) { @@ -454,45 +742,136 @@ public class CacheMemorySource extends AbstractService impleme } @Override + @RpcMultiRun + public void removeStringListItem(String key, String value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || entry.listValue == null) return; + entry.listValue.remove(value); + } + + @Override + @RpcMultiRun + public void removeLongListItem(String key, long value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || entry.listValue == null) return; + entry.listValue.remove(value); + } + + @Override + @RpcMultiRun public CompletableFuture removeListItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()); } @Override @RpcMultiRun - public void appendSetItem(String key, V value) { + public CompletableFuture removeStringListItemAsync(final String key, final String value) { + return CompletableFuture.runAsync(() -> removeStringListItem(key, value), getExecutor()); + } + + @Override + @RpcMultiRun + public CompletableFuture removeLongListItemAsync(final String key, final long value) { + return CompletableFuture.runAsync(() -> removeLongListItem(key, value), getExecutor()); + } + + protected void appendSetItem(CacheEntryType cacheType, String key, Object value) { if (key == null) return; CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.setValue == null) { - CopyOnWriteArraySet set = new CopyOnWriteArraySet(); - entry = new CacheEntry(CacheEntryType.SET, key, null, set, null); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + CopyOnWriteArraySet set = new CopyOnWriteArraySet(); + entry = new CacheEntry(cacheType, key, null, set, null); CacheEntry old = container.putIfAbsent(key, entry); - if (old != null) set = old.setValue; + if (old != null) set = old.csetValue; if (set != null) set.add(value); } else { - entry.setValue.add(value); + entry.csetValue.add(value); } } @Override + @RpcMultiRun + public void appendSetItem(String key, V value) { + appendSetItem(CacheEntryType.OBJECT_SET, key, value); + } + + @Override + @RpcMultiRun + public void appendStringSetItem(String key, String value) { + appendSetItem(CacheEntryType.OBJECT_SET, key, value); + } + + @Override + @RpcMultiRun + public void appendLongSetItem(String key, long value) { + appendSetItem(CacheEntryType.OBJECT_SET, key, value); + } + + @Override + @RpcMultiRun public CompletableFuture appendSetItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()); } + @Override + @RpcMultiRun + public CompletableFuture appendStringSetItemAsync(final String key, final String value) { + return CompletableFuture.runAsync(() -> appendStringSetItem(key, value), getExecutor()); + } + + @Override + @RpcMultiRun + public CompletableFuture appendLongSetItemAsync(final String key, final long value) { + return CompletableFuture.runAsync(() -> appendLongSetItem(key, value), getExecutor()); + } + @Override @RpcMultiRun public void removeSetItem(String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); - if (entry == null || entry.setValue == null) return; - entry.setValue.remove(value); + if (entry == null || entry.csetValue == null) return; + entry.csetValue.remove(value); } @Override + @RpcMultiRun + public void removeStringSetItem(String key, String value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || entry.csetValue == null) return; + entry.csetValue.remove(value); + } + + @Override + @RpcMultiRun + public void removeLongSetItem(String key, long value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || entry.csetValue == null) return; + entry.csetValue.remove(value); + } + + @Override + @RpcMultiRun public CompletableFuture removeSetItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()); } + @Override + @RpcMultiRun + public CompletableFuture removeStringSetItemAsync(final String key, final String value) { + return CompletableFuture.runAsync(() -> removeStringSetItem(key, value), getExecutor()); + } + + @Override + @RpcMultiRun + public CompletableFuture removeLongSetItemAsync(final String key, final long value) { + return CompletableFuture.runAsync(() -> removeLongSetItem(key, value), getExecutor()); + } + @Override public List queryKeys() { return new ArrayList<>(container.keySet()); diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index f08297d75..7b9da9f5d 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -76,6 +76,46 @@ public interface CacheSource { public List> queryList(); + public String getString(final String key); + + public String getStringAndRefresh(final String key, final int expireSeconds); + + public void setString(final String key, final String value); + + public void setString(final int expireSeconds, final String key, final String value); + + public Collection getStringCollection(final String key); + + public Collection getStringCollectionAndRefresh(final String key, final int expireSeconds); + + public void appendStringListItem(final String key, final String value); + + public void removeStringListItem(final String key, final String value); + + public void appendStringSetItem(final String key, final String value); + + public void removeStringSetItem(final String key, final String value); + + public long getLong(final String key, long defValue); + + public long getLongAndRefresh(final String key, final int expireSeconds, long defValue); + + public void setLong(final String key, final long value); + + public void setLong(final int expireSeconds, final String key, final long value); + + public Collection getLongCollection(final String key); + + public Collection getLongCollectionAndRefresh(final String key, final int expireSeconds); + + public void appendLongListItem(final String key, final long value); + + public void removeLongListItem(final String key, final long value); + + public void appendLongSetItem(final String key, final long value); + + public void removeLongSetItem(final String key, final long value); + //---------------------- CompletableFuture 异步版 --------------------------------- public CompletableFuture existsAsync(final String key); @@ -121,20 +161,58 @@ public interface CacheSource { public CompletableFuture>> queryListAsync(); + public CompletableFuture getStringAsync(final String key); + + public CompletableFuture getStringAndRefreshAsync(final String key, final int expireSeconds); + + public CompletableFuture setStringAsync(final String key, final String value); + + public CompletableFuture setStringAsync(final int expireSeconds, final String key, final String value); + + public CompletableFuture> getStringCollectionAsync(final String key); + + public CompletableFuture> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds); + + public CompletableFuture appendStringListItemAsync(final String key, final String value); + + public CompletableFuture removeStringListItemAsync(final String key, final String value); + + public CompletableFuture appendStringSetItemAsync(final String key, final String value); + + public CompletableFuture removeStringSetItemAsync(final String key, final String value); + + public CompletableFuture getLongAsync(final String key, long defValue); + + public CompletableFuture getLongAndRefreshAsync(final String key, final int expireSeconds, long defValue); + + public CompletableFuture setLongAsync(final String key, long value); + + public CompletableFuture setLongAsync(final int expireSeconds, final String key, final long value); + + public CompletableFuture> getLongCollectionAsync(final String key); + + public CompletableFuture> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds); + + public CompletableFuture appendLongListItemAsync(final String key, final long value); + + public CompletableFuture removeLongListItemAsync(final String key, final long value); + + public CompletableFuture appendLongSetItemAsync(final String key, final long value); + + public CompletableFuture removeLongSetItemAsync(final String key, final long value); + default CompletableFuture isOpenAsync() { return CompletableFuture.completedFuture(isOpen()); } public static enum CacheEntryType { - OBJECT, SET, LIST; + LONG, STRING, OBJECT, ATOMIC, + LONG_SET, STRING_SET, OBJECT_SET, + LONG_LIST, STRING_LIST, OBJECT_LIST; } public static final class CacheEntry { - static final String JSON_SET_KEY = "{\"cacheType\":\"" + CacheEntryType.SET + "\""; - - static final String JSON_LIST_KEY = "{\"cacheType\":\"" + CacheEntryType.LIST + "\""; - final CacheEntryType cacheType; final String key; @@ -146,26 +224,26 @@ public interface CacheSource { T objectValue; - CopyOnWriteArraySet setValue; + CopyOnWriteArraySet csetValue; ConcurrentLinkedQueue listValue; - public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { - this(cacheType, 0, key, objectValue, setValue, listValue); + public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue) { + this(cacheType, 0, key, objectValue, csetValue, listValue); } - public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { - this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, setValue, listValue); + public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue) { + this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue); } - @ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "setValue", "listValue"}) - public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { + @ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue"}) + public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue) { this.cacheType = cacheType; this.expireSeconds = expireSeconds; this.lastAccessed = lastAccessed; this.key = key; this.objectValue = objectValue; - this.setValue = setValue; + this.csetValue = csetValue; this.listValue = listValue; } @@ -176,12 +254,12 @@ public interface CacheSource { @ConvertColumn(ignore = true) public boolean isListCacheType() { - return cacheType == CacheEntryType.LIST; + return cacheType == CacheEntryType.LONG_LIST || cacheType == CacheEntryType.STRING_LIST || cacheType == CacheEntryType.OBJECT_LIST; } @ConvertColumn(ignore = true) public boolean isSetCacheType() { - return cacheType == CacheEntryType.SET; + return cacheType == CacheEntryType.LONG_SET || cacheType == CacheEntryType.STRING_SET || cacheType == CacheEntryType.OBJECT_SET; } @ConvertColumn(ignore = true) @@ -209,8 +287,8 @@ public interface CacheSource { return objectValue; } - public CopyOnWriteArraySet getSetValue() { - return setValue; + public CopyOnWriteArraySet getCsetValue() { + return csetValue; } public ConcurrentLinkedQueue getListValue() {