From 3e0b9daeaf826305be824664e91edf41f1bb48c3 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Wed, 8 Nov 2017 11:06:55 +0800 Subject: [PATCH] =?UTF-8?q?CacheSource=E4=B8=AD=E7=9A=84key=E5=9B=BA?= =?UTF-8?q?=E5=AE=9A=E4=BD=BF=E7=94=A8String=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/boot/NodeServer.java | 4 +- src/org/redkale/net/http/WebSocketNode.java | 19 +-- .../redkale/service/WebSocketNodeService.java | 4 +- src/org/redkale/source/CacheMemorySource.java | 116 +++++++++--------- src/org/redkale/source/CacheSource.java | 84 +++++++------ 5 files changed, 111 insertions(+), 116 deletions(-) diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index c710d17da..a856db5ef 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -273,10 +273,10 @@ public abstract class NodeServer { source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); Type genericType = field.getGenericType(); ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; - Type valType = pt == null ? null : pt.getActualTypeArguments()[1]; + Type valType = pt == null ? null : pt.getActualTypeArguments()[0]; if (sourceType == CacheMemorySource.class) { CacheMemorySource memorySource = (CacheMemorySource) source; - memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class); + memorySource.setStoreType(valType instanceof Class ? (Class) valType : Object.class); if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后 } application.cacheSources.add((CacheSource) source); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index de639d756..a1eccb481 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -28,11 +28,14 @@ import org.redkale.util.*; */ public abstract class WebSocketNode { + @Comment("存储用户ID的key前缀") + public static final String SOURCE_SNCP_USERID_PREFIX = "wsuid_"; + @Comment("存储当前SNCP节点列表的key") - public static final String SOURCE_SNCP_NODES_KEY = "redkale_sncpnodes"; + public static final String SOURCE_SNCP_NODES_KEY = "ws_sncpnodes"; @Comment("存储当前用户数量的key") - public static final String SOURCE_USER_COUNT_KEY = "redkale_usercount"; + public static final String SOURCE_USER_COUNT_KEY = "ws_usercount"; protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); @@ -52,7 +55,7 @@ public abstract class WebSocketNode { //集合包含 localSncpAddress //如果不是分布式(没有SNCP),sncpNodeAddresses 将不会被用到 @Resource(name = "$_nodes") - protected CacheSource sncpNodeAddresses; + protected CacheSource sncpNodeAddresses; //当前节点的本地WebSocketEngine protected WebSocketEngine localEngine; @@ -124,7 +127,7 @@ public abstract class WebSocketNode { * @return 地址列表 */ public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { - if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(userid); + if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); List rs = new ArrayList<>(); rs.add(this.localSncpAddress); return CompletableFuture.completedFuture(rs); @@ -165,7 +168,7 @@ public abstract class WebSocketNode { if (this.localEngine != null && this.sncpNodeAddresses == null) { return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid)); } - return this.sncpNodeAddresses.existsAsync(userid); + return this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid); } /** @@ -199,7 +202,7 @@ public abstract class WebSocketNode { return localFuture; } //远程节点关闭 - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(userid); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (finest) logger.finest("websocket found userid:" + userid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); @@ -402,7 +405,7 @@ public abstract class WebSocketNode { return this.localEngine.broadcastMessage(message, last); } CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(message, last); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync("redkale_sncpnodes"); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_NODES_KEY); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (finest) logger.finest("websocket broadcast message on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); @@ -427,7 +430,7 @@ public abstract class WebSocketNode { return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; } //远程节点发送消息 - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(userid); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (addrs == null || addrs.isEmpty()) { if (finer) logger.finer("websocket not found userid:" + userid + " on any node "); diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index f55e72272..b70896de5 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -67,7 +67,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { */ @Override public CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr) { - CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr); + CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_NODES_KEY, sncpAddr)); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); return future; @@ -83,7 +83,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { */ @Override public CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr) { - CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(userid, sncpAddr); + CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); return future; } diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index c8af82fad..114ce46a4 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -20,7 +20,6 @@ import org.redkale.util.*; /** * CacheSource的默认实现--内存缓存 * - * @param key类型 * @param value类型 *

* 详情见: https://redkale.org @@ -30,7 +29,7 @@ import org.redkale.util.*; @Local @AutoLoad(false) @ResourceType(CacheSource.class) -public class CacheMemorySource extends AbstractService implements CacheSource, Service, AutoCloseable, Resourcable { +public class CacheMemorySource extends AbstractService implements CacheSource, Service, AutoCloseable, Resourcable { @Resource(name = "APP_HOME") private File home; @@ -40,8 +39,6 @@ public class CacheMemorySource extends private boolean needStore; - private Class keyType; - private Type objValueType; private Type setValueType; @@ -54,20 +51,19 @@ public class CacheMemorySource extends private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - protected final ConcurrentHashMap> container = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap> container = new ConcurrentHashMap<>(); @RpcRemote - protected CacheSource remoteSource; + protected CacheSource remoteSource; public CacheMemorySource() { } - public final CacheMemorySource setStoreType(Class keyType, Class valueType) { - this.keyType = keyType; + public final CacheMemorySource setStoreType(Class valueType) { this.objValueType = valueType; this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, valueType); this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, valueType); - this.setNeedStore(this.keyType != null && this.keyType != Serializable.class && this.objValueType != null); + this.setNeedStore(this.objValueType != null); return this; } @@ -79,14 +75,13 @@ public class CacheMemorySource extends public void init(AnyValue conf) { final CacheMemorySource self = this; AnyValue prop = conf == null ? null : conf.getAnyValue("property"); - if (keyType == null && prop != null) { - String storeKeyStr = prop.getValue("key-type"); + if (prop != null) { String storeValueStr = prop.getValue("value-type"); - if (storeKeyStr != null && storeValueStr != null) { + if (storeValueStr != null) { try { - this.setStoreType(Thread.currentThread().getContextClassLoader().loadClass(storeKeyStr), Thread.currentThread().getContextClassLoader().loadClass(storeValueStr)); + this.setStoreType(Thread.currentThread().getContextClassLoader().loadClass(storeValueStr)); } catch (Throwable e) { - logger.log(Level.SEVERE, self.getClass().getSimpleName() + " load key & value store class (" + storeKeyStr + ", " + storeValueStr + ") error", e); + logger.log(Level.SEVERE, self.getClass().getSimpleName() + " load key & value store class (" + storeValueStr + ") error", e); } } if (prop.getBoolValue("store-ignore", false)) setNeedStore(false); @@ -105,7 +100,7 @@ public class CacheMemorySource extends t.setDaemon(true); return t; }); - final List keys = new ArrayList<>(); + final List keys = new ArrayList<>(); scheduler.scheduleWithFixedDelay(() -> { keys.clear(); int now = (int) (System.currentTimeMillis() / 1000); @@ -114,7 +109,7 @@ public class CacheMemorySource extends keys.add(x.key); } }); - for (K key : keys) { + for (String key : keys) { CacheEntry entry = container.remove(key); if (expireHandler != null && entry != null) expireHandler.accept(entry); } @@ -131,19 +126,18 @@ public class CacheMemorySource extends File store = new File(home, "cache/" + resourceName()); if (!store.isFile() || !store.canRead()) return; LineNumberReader reader = new LineNumberReader(new FileReader(store)); - if (this.keyType == null) this.keyType = Serializable.class; if (this.objValueType == null) { this.objValueType = Object.class; this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, this.objValueType); this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, this.objValueType); } - final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType); - final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType); - final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType); + final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType); + final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, setValueType); + final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, listValueType); String line; while ((line = reader.readLine()) != null) { if (line.isEmpty()) continue; - CacheEntry entry = convert.convertFrom(line.startsWith(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.startsWith(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line); + CacheEntry entry = convert.convertFrom(line.startsWith(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.startsWith(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line); if (entry.isExpired()) continue; if (datasync && container.containsKey(entry.key)) continue; //已经同步了 container.put(entry.key, entry); @@ -159,12 +153,12 @@ public class CacheMemorySource extends if (client != null && client.getRemoteGroupTransport() != null) { super.runAsync(() -> { try { - CompletableFuture>> listFuture = remoteSource.queryListAsync(); + CompletableFuture>> listFuture = remoteSource.queryListAsync(); listFuture.whenComplete((list, exp) -> { if (exp != null) { logger.log(Level.FINEST, CacheSource.class.getSimpleName() + "(" + resourceName() + ") queryListAsync error", exp); } else { - for (CacheEntry entry : list) { + for (CacheEntry entry : list) { container.put(entry.key, entry); } } @@ -196,10 +190,10 @@ public class CacheMemorySource extends File store = new File(home, "cache/" + resourceName()); store.getParentFile().mkdirs(); PrintStream stream = new PrintStream(store, "UTF-8"); - final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType); - final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType); - final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType); - Collection> entrys = container.values(); + final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType); + final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, setValueType); + final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, listValueType); + Collection> entrys = container.values(); for (CacheEntry entry : entrys) { stream.println(convert.convertTo(entry.isSetCacheType() ? storeSetType : (entry.isListCacheType() ? storeListType : storeObjType), entry)); } @@ -211,7 +205,7 @@ public class CacheMemorySource extends } @Override - public boolean exists(K key) { + public boolean exists(String key) { if (key == null) return false; CacheEntry entry = container.get(key); if (entry == null) return false; @@ -219,12 +213,12 @@ public class CacheMemorySource extends } @Override - public CompletableFuture existsAsync(final K key) { + public CompletableFuture existsAsync(final String key) { return CompletableFuture.supplyAsync(() -> exists(key), getExecutor()); } @Override - public V get(K key) { + public V get(String key) { if (key == null) return null; CacheEntry entry = container.get(key); if (entry == null || entry.isExpired()) return null; @@ -234,13 +228,13 @@ public class CacheMemorySource extends } @Override - public CompletableFuture getAsync(final K key) { + public CompletableFuture getAsync(final String key) { return CompletableFuture.supplyAsync(() -> get(key), getExecutor()); } @Override @RpcMultiRun - public V getAndRefresh(K key, final int expireSeconds) { + public V getAndRefresh(String key, final int expireSeconds) { if (key == null) return null; CacheEntry entry = container.get(key); if (entry == null || entry.isExpired()) return null; @@ -252,13 +246,13 @@ public class CacheMemorySource extends } @Override - public CompletableFuture getAndRefreshAsync(final K key, final int expireSeconds) { + public CompletableFuture getAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds), getExecutor()); } @Override @RpcMultiRun - public void refresh(K key, final int expireSeconds) { + public void refresh(String key, final int expireSeconds) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) return; @@ -267,13 +261,13 @@ public class CacheMemorySource extends } @Override - public CompletableFuture refreshAsync(final K key, final int expireSeconds) { + public CompletableFuture refreshAsync(final String key, final int expireSeconds) { return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()); } @Override @RpcMultiRun - public void set(K key, V value) { + public void set(String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) { @@ -287,13 +281,13 @@ public class CacheMemorySource extends } @Override - public CompletableFuture setAsync(K key, V value) { + public CompletableFuture setAsync(String key, V value) { return CompletableFuture.runAsync(() -> set(key, value), getExecutor()); } @Override @RpcMultiRun - public void set(int expireSeconds, K key, V value) { + public void set(int expireSeconds, String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) { @@ -307,13 +301,13 @@ public class CacheMemorySource extends } @Override - public CompletableFuture setAsync(int expireSeconds, K key, V value) { + public CompletableFuture setAsync(int expireSeconds, String key, V value) { return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()); } @Override @RpcMultiRun - public void setExpireSeconds(K key, int expireSeconds) { + public void setExpireSeconds(String key, int expireSeconds) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) return; @@ -321,56 +315,56 @@ public class CacheMemorySource extends } @Override - public CompletableFuture setExpireSecondsAsync(final K key, final int expireSeconds) { + public CompletableFuture setExpireSecondsAsync(final String key, final int expireSeconds) { return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor()); } @Override @RpcMultiRun - public void remove(K key) { + public void remove(String key) { if (key == null) return; container.remove(key); } @Override - public CompletableFuture removeAsync(final K key) { + public CompletableFuture removeAsync(final String key) { return CompletableFuture.runAsync(() -> remove(key), getExecutor()); } @Override - public Collection getCollection(final K key) { + public Collection getCollection(final String key) { return (Collection) get(key); } @Override - public CompletableFuture> getCollectionAsync(final K key) { + public CompletableFuture> getCollectionAsync(final String key) { return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor()); } @Override - public int getCollectionSize(final K key) { + public int getCollectionSize(final String key) { Collection collection = (Collection) get(key); return collection == null ? 0 : collection.size(); } @Override - public CompletableFuture getCollectionSizeAsync(final K key) { + public CompletableFuture getCollectionSizeAsync(final String key) { return CompletableFuture.supplyAsync(() -> getCollectionSize(key), getExecutor()); } @Override - public Collection getCollectionAndRefresh(final K key, final int expireSeconds) { + public Collection getCollectionAndRefresh(final String key, final int expireSeconds) { return (Collection) getAndRefresh(key, expireSeconds); } @Override - public CompletableFuture> getCollectionAndRefreshAsync(final K key, final int expireSeconds) { + public CompletableFuture> getCollectionAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds), getExecutor()); } @Override @RpcMultiRun - public void appendListItem(K key, V value) { + public void appendListItem(String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null || !entry.isListCacheType() || entry.listValue == null) { @@ -385,13 +379,13 @@ public class CacheMemorySource extends } @Override - public CompletableFuture appendListItemAsync(final K key, final V value) { + public CompletableFuture appendListItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()); } @Override @RpcMultiRun - public void removeListItem(K key, V value) { + public void removeListItem(String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null || entry.listValue == null) return; @@ -399,13 +393,13 @@ public class CacheMemorySource extends } @Override - public CompletableFuture removeListItemAsync(final K key, final V value) { + public CompletableFuture removeListItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()); } @Override @RpcMultiRun - public void appendSetItem(K key, V value) { + public void appendSetItem(String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null || !entry.isSetCacheType() || entry.setValue == null) { @@ -420,13 +414,13 @@ public class CacheMemorySource extends } @Override - public CompletableFuture appendSetItemAsync(final K key, final V value) { + public CompletableFuture appendSetItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()); } @Override @RpcMultiRun - public void removeSetItem(K key, V value) { + public void removeSetItem(String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); if (entry == null || entry.setValue == null) return; @@ -434,12 +428,12 @@ public class CacheMemorySource extends } @Override - public CompletableFuture removeSetItemAsync(final K key, final V value) { + public CompletableFuture removeSetItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()); } @Override - public List queryKeys() { + public List queryKeys() { return new ArrayList<>(container.keySet()); } @@ -449,17 +443,17 @@ public class CacheMemorySource extends } @Override - public CompletableFuture>> queryListAsync() { + public CompletableFuture>> queryListAsync() { return CompletableFuture.completedFuture(new ArrayList<>(container.values())); } @Override - public List> queryList() { + public List> queryList() { return new ArrayList<>(container.values()); } @Override - public CompletableFuture> queryKeysAsync() { + public CompletableFuture> queryKeysAsync() { return CompletableFuture.completedFuture(new ArrayList<>(container.keySet())); } diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index 72529daf2..c103f04bc 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -6,7 +6,6 @@ package org.redkale.source; import java.beans.ConstructorProperties; -import java.io.*; import java.util.*; import java.util.concurrent.*; import org.redkale.convert.ConvertColumn; @@ -14,91 +13,90 @@ import org.redkale.convert.json.JsonFactory; /** * - * @param key的类型 * @param value的类型 *

* 详情见: https://redkale.org * * @author zhangjx */ -public interface CacheSource { +public interface CacheSource { default boolean isOpen() { return true; } - public boolean exists(final K key); + public boolean exists(final String key); - public V get(final K key); + public V get(final String key); - public V getAndRefresh(final K key, final int expireSeconds); + public V getAndRefresh(final String key, final int expireSeconds); - public void refresh(final K key, final int expireSeconds); + public void refresh(final String key, final int expireSeconds); - public void set(final K key, final V value); + public void set(final String key, final V value); - public void set(final int expireSeconds, final K key, final V value); + public void set(final int expireSeconds, final String key, final V value); - public void setExpireSeconds(final K key, final int expireSeconds); + public void setExpireSeconds(final String key, final int expireSeconds); - public void remove(final K key); + public void remove(final String key); - public Collection getCollection(final K key); + public Collection getCollection(final String key); - public int getCollectionSize(final K key); + public int getCollectionSize(final String key); - public Collection getCollectionAndRefresh(final K key, final int expireSeconds); + public Collection getCollectionAndRefresh(final String key, final int expireSeconds); - public void appendListItem(final K key, final V value); + public void appendListItem(final String key, final V value); - public void removeListItem(final K key, final V value); + public void removeListItem(final String key, final V value); - public void appendSetItem(final K key, final V value); + public void appendSetItem(final String key, final V value); - public void removeSetItem(final K key, final V value); + public void removeSetItem(final String key, final V value); - public List queryKeys(); + public List queryKeys(); public int getKeySize(); - public List> queryList(); + public List> queryList(); //---------------------- CompletableFuture 异步版 --------------------------------- - public CompletableFuture existsAsync(final K key); + public CompletableFuture existsAsync(final String key); - public CompletableFuture getAsync(final K key); + public CompletableFuture getAsync(final String key); - public CompletableFuture getAndRefreshAsync(final K key, final int expireSeconds); + public CompletableFuture getAndRefreshAsync(final String key, final int expireSeconds); - public CompletableFuture refreshAsync(final K key, final int expireSeconds); + public CompletableFuture refreshAsync(final String key, final int expireSeconds); - public CompletableFuture setAsync(final K key, final V value); + public CompletableFuture setAsync(final String key, final V value); - public CompletableFuture setAsync(final int expireSeconds, final K key, final V value); + public CompletableFuture setAsync(final int expireSeconds, final String key, final V value); - public CompletableFuture setExpireSecondsAsync(final K key, final int expireSeconds); + public CompletableFuture setExpireSecondsAsync(final String key, final int expireSeconds); - public CompletableFuture removeAsync(final K key); + public CompletableFuture removeAsync(final String key); - public CompletableFuture> getCollectionAsync(final K key); + public CompletableFuture> getCollectionAsync(final String key); - public CompletableFuture getCollectionSizeAsync(final K key); + public CompletableFuture getCollectionSizeAsync(final String key); - public CompletableFuture> getCollectionAndRefreshAsync(final K key, final int expireSeconds); + public CompletableFuture> getCollectionAndRefreshAsync(final String key, final int expireSeconds); - public CompletableFuture appendListItemAsync(final K key, final V value); + public CompletableFuture appendListItemAsync(final String key, final V value); - public CompletableFuture removeListItemAsync(final K key, final V value); + public CompletableFuture removeListItemAsync(final String key, final V value); - public CompletableFuture appendSetItemAsync(final K key, final V value); + public CompletableFuture appendSetItemAsync(final String key, final V value); - public CompletableFuture removeSetItemAsync(final K key, final V value); + public CompletableFuture removeSetItemAsync(final String key, final V value); - public CompletableFuture> queryKeysAsync(); + public CompletableFuture> queryKeysAsync(); public CompletableFuture getKeySizeAsync(); - public CompletableFuture>> queryListAsync(); + public CompletableFuture>> queryListAsync(); default CompletableFuture isOpenAsync() { return CompletableFuture.completedFuture(true); @@ -108,7 +106,7 @@ public interface CacheSource { OBJECT, SET, LIST; } - public static final class CacheEntry { + public static final class CacheEntry { static final String JSON_SET_KEY = "{\"cacheType\":\"" + CacheEntryType.SET + "\""; @@ -116,7 +114,7 @@ public interface CacheSource { final CacheEntryType cacheType; - final K key; + final String key; //<=0表示永久保存 int expireSeconds; @@ -129,16 +127,16 @@ public interface CacheSource { ConcurrentLinkedQueue listValue; - public CacheEntry(CacheEntryType cacheType, K key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { + public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { this(cacheType, 0, key, objectValue, setValue, listValue); } - public CacheEntry(CacheEntryType cacheType, int expireSeconds, K key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { + public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, setValue, listValue); } @ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "setValue", "listValue"}) - public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, K key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { + public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { this.cacheType = cacheType; this.expireSeconds = expireSeconds; this.lastAccessed = lastAccessed; @@ -180,7 +178,7 @@ public interface CacheSource { return lastAccessed; } - public K getKey() { + public String getKey() { return key; }