This commit is contained in:
@@ -206,8 +206,10 @@ public abstract class NodeServer {
|
|||||||
//src 不含 MultiRun 方法
|
//src 不含 MultiRun 方法
|
||||||
}
|
}
|
||||||
CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
|
CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
|
||||||
CacheStore store = field.getAnnotation(CacheStore.class);
|
Type genericType = field.getGenericType();
|
||||||
if (store != null) source.setStoreType(store.keyType(), store.valueType(), store.entryType());
|
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
|
||||||
|
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
|
||||||
|
source.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class);
|
||||||
application.cacheSources.add(source);
|
application.cacheSources.add(source);
|
||||||
regFactory.register(resourceName, CacheSource.class, source);
|
regFactory.register(resourceName, CacheSource.class, source);
|
||||||
field.set(src, source);
|
field.set(src, source);
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ public abstract class WebSocketNode {
|
|||||||
|
|
||||||
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合
|
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合
|
||||||
@Resource(name = "$_nodeaddress_source")
|
@Resource(name = "$_nodeaddress_source")
|
||||||
protected CacheSource source;
|
protected CacheSource<Serializable, InetSocketAddress> source;
|
||||||
|
|
||||||
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合
|
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合
|
||||||
protected final ConcurrentHashMap<Serializable, Set<String>> localNodes = new ConcurrentHashMap();
|
protected final ConcurrentHashMap<Serializable, Set<String>> localNodes = new ConcurrentHashMap();
|
||||||
@@ -106,7 +106,7 @@ public abstract class WebSocketNode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ((recent && rscode == 0) || remoteNode == null) return rscode;
|
if ((recent && rscode == 0) || remoteNode == null) return rscode;
|
||||||
Set<InetSocketAddress> addrs = source.get(groupid);
|
Collection<InetSocketAddress> addrs = source.getCollection(groupid);
|
||||||
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点
|
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点
|
||||||
if (recent) {
|
if (recent) {
|
||||||
InetSocketAddress one = null;
|
InetSocketAddress one = null;
|
||||||
|
|||||||
@@ -20,11 +20,13 @@ import org.redkale.util.*;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
* @param <K>
|
||||||
|
* @param <V>
|
||||||
* @see http://www.redkale.org
|
* @see http://www.redkale.org
|
||||||
* @author zhangjx
|
* @author zhangjx
|
||||||
*/
|
*/
|
||||||
@AutoLoad(false)
|
@AutoLoad(false)
|
||||||
public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
public class CacheSourceService<K extends Serializable, V> implements CacheSource<K, V>, Service, AutoCloseable {
|
||||||
|
|
||||||
@Resource(name = "APP_HOME")
|
@Resource(name = "APP_HOME")
|
||||||
private File home;
|
private File home;
|
||||||
@@ -32,9 +34,13 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
@Resource
|
@Resource
|
||||||
private JsonConvert convert;
|
private JsonConvert convert;
|
||||||
|
|
||||||
private Class storeKeyType;
|
private Class keyType;
|
||||||
|
|
||||||
private Type storeValueType;
|
private Type objValueType;
|
||||||
|
|
||||||
|
private Type setValueType;
|
||||||
|
|
||||||
|
private Type listValueType;
|
||||||
|
|
||||||
private ScheduledThreadPoolExecutor scheduler;
|
private ScheduledThreadPoolExecutor scheduler;
|
||||||
|
|
||||||
@@ -42,20 +48,16 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
|
|
||||||
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
|
|
||||||
protected final ConcurrentHashMap<Serializable, CacheEntry> container = new ConcurrentHashMap<>();
|
protected final ConcurrentHashMap<K, CacheEntry<K, ?>> container = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public CacheSourceService() {
|
public CacheSourceService() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CacheSourceService setStoreType(Class storeKeyType, Class storeValueType, CacheStore.CacheEntryType entryType) {
|
public CacheSourceService setStoreType(Class keyType, Class valueType) {
|
||||||
this.storeKeyType = storeKeyType;
|
this.keyType = keyType;
|
||||||
if (entryType == CacheStore.CacheEntryType.SET) {
|
this.objValueType = valueType;
|
||||||
this.storeValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, storeValueType);
|
this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, valueType);
|
||||||
} else if (entryType == CacheStore.CacheEntryType.LIST) {
|
this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, valueType);
|
||||||
this.storeValueType = TypeToken.createParameterizedType(null, CopyOnWriteArrayList.class, storeValueType);
|
|
||||||
} else {
|
|
||||||
this.storeValueType = storeValueType;
|
|
||||||
}
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,13 +65,12 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
public void init(AnyValue conf) {
|
public void init(AnyValue conf) {
|
||||||
final CacheSourceService self = this;
|
final CacheSourceService self = this;
|
||||||
AnyValue prop = conf == null ? null : conf.getAnyValue("property");
|
AnyValue prop = conf == null ? null : conf.getAnyValue("property");
|
||||||
if (storeKeyType == null && prop != null) {
|
if (keyType == null && prop != null) {
|
||||||
String storeKeyStr = prop.getValue("store-key-type");
|
String storeKeyStr = prop.getValue("key-type");
|
||||||
String storeValueStr = prop.getValue("store-value-type");
|
String storeValueStr = prop.getValue("value-type");
|
||||||
String storeEntryStr = prop.getValue("store-entry-type", CacheStore.CacheEntryType.OBJECT.name()).toUpperCase();
|
|
||||||
if (storeKeyStr != null && storeValueStr != null) {
|
if (storeKeyStr != null && storeValueStr != null) {
|
||||||
try {
|
try {
|
||||||
this.setStoreType(Class.forName(storeKeyStr), Class.forName(storeValueStr), CacheStore.CacheEntryType.valueOf(storeEntryStr));
|
this.setStoreType(Class.forName(storeKeyStr), Class.forName(storeValueStr));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.log(Level.SEVERE, self.getClass().getSimpleName() + " load key & value store class (" + storeKeyStr + ", " + storeValueStr + ") error", e);
|
logger.log(Level.SEVERE, self.getClass().getSimpleName() + " load key & value store class (" + storeKeyStr + ", " + storeValueStr + ") error", e);
|
||||||
}
|
}
|
||||||
@@ -89,7 +90,7 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
t.setDaemon(true);
|
t.setDaemon(true);
|
||||||
return t;
|
return t;
|
||||||
});
|
});
|
||||||
final List<Serializable> keys = new ArrayList<>();
|
final List<K> keys = new ArrayList<>();
|
||||||
scheduler.scheduleWithFixedDelay(() -> {
|
scheduler.scheduleWithFixedDelay(() -> {
|
||||||
keys.clear();
|
keys.clear();
|
||||||
int now = (int) (System.currentTimeMillis() / 1000);
|
int now = (int) (System.currentTimeMillis() / 1000);
|
||||||
@@ -98,7 +99,7 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
keys.add(x.key);
|
keys.add(x.key);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
for (Serializable key : keys) {
|
for (K key : keys) {
|
||||||
CacheEntry entry = container.remove(key);
|
CacheEntry entry = container.remove(key);
|
||||||
if (expireHandler != null && entry != null) expireHandler.accept(entry);
|
if (expireHandler != null && entry != null) expireHandler.accept(entry);
|
||||||
}
|
}
|
||||||
@@ -110,17 +111,25 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
boolean datasync = false; //是否从远程同步过数据
|
boolean datasync = false; //是否从远程同步过数据
|
||||||
//----------同步数据……-----------
|
//----------同步数据……-----------
|
||||||
// TODO
|
// TODO
|
||||||
if (this.storeKeyType == null) return;
|
if (this.keyType == null) return;
|
||||||
try {
|
try {
|
||||||
CacheEntry.initCreator();
|
CacheEntry.initCreator();
|
||||||
File store = new File(home, "cache/" + name());
|
File store = new File(home, "cache/" + name());
|
||||||
if (!store.isFile() || !store.canRead()) return;
|
if (!store.isFile() || !store.canRead()) return;
|
||||||
LineNumberReader reader = new LineNumberReader(new FileReader(store));
|
LineNumberReader reader = new LineNumberReader(new FileReader(store));
|
||||||
final Type storeType = TypeToken.createParameterizedType(null, CacheEntry.class, storeKeyType, storeValueType);
|
if (this.keyType == null) this.keyType = Serializable.class;
|
||||||
|
if (this.objValueType == null) {
|
||||||
|
this.objValueType = Object.class;
|
||||||
|
this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, this.objValueType);
|
||||||
|
this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, this.objValueType);
|
||||||
|
}
|
||||||
|
final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType);
|
||||||
|
final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType);
|
||||||
|
final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType);
|
||||||
String line;
|
String line;
|
||||||
while ((line = reader.readLine()) != null) {
|
while ((line = reader.readLine()) != null) {
|
||||||
if (line.isEmpty()) continue;
|
if (line.isEmpty()) continue;
|
||||||
CacheEntry entry = convert.convertFrom(storeType, line);
|
CacheEntry<K, ?> entry = convert.convertFrom(line.contains(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.contains(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line);
|
||||||
if (entry.isExpired()) continue;
|
if (entry.isExpired()) continue;
|
||||||
if (datasync && container.containsKey(entry.key)) continue; //已经同步了
|
if (datasync && container.containsKey(entry.key)) continue; //已经同步了
|
||||||
container.put(entry.key, entry);
|
container.put(entry.key, entry);
|
||||||
@@ -140,16 +149,18 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
@Override
|
@Override
|
||||||
public void destroy(AnyValue conf) {
|
public void destroy(AnyValue conf) {
|
||||||
if (scheduler != null) scheduler.shutdownNow();
|
if (scheduler != null) scheduler.shutdownNow();
|
||||||
if (this.storeKeyType == null || Sncp.isRemote(this) || container.isEmpty()) return;
|
if (this.keyType == null || Sncp.isRemote(this) || container.isEmpty()) return;
|
||||||
try {
|
try {
|
||||||
CacheEntry.initCreator();
|
CacheEntry.initCreator();
|
||||||
File store = new File(home, "cache/" + name());
|
File store = new File(home, "cache/" + name());
|
||||||
store.getParentFile().mkdirs();
|
store.getParentFile().mkdirs();
|
||||||
PrintStream stream = new PrintStream(store, "UTF-8");
|
PrintStream stream = new PrintStream(store, "UTF-8");
|
||||||
Collection<CacheEntry> values = container.values();
|
Collection<CacheEntry<K, ?>> values = (Collection<CacheEntry<K, ?>>) container.values();
|
||||||
final Type storeType = TypeToken.createParameterizedType(null, CacheEntry.class, storeKeyType, storeValueType);;
|
final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType);
|
||||||
|
final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType);
|
||||||
|
final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType);
|
||||||
for (CacheEntry entry : values) {
|
for (CacheEntry entry : values) {
|
||||||
stream.println(convert.convertTo(storeType, entry));
|
stream.println(convert.convertTo(entry.isSetCacheType() ? storeSetType : (entry.isListCacheType() ? storeListType : storeObjType), entry));
|
||||||
}
|
}
|
||||||
container.clear();
|
container.clear();
|
||||||
stream.close();
|
stream.close();
|
||||||
@@ -159,7 +170,7 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean exists(Serializable key) {
|
public boolean exists(K key) {
|
||||||
if (key == null) return false;
|
if (key == null) return false;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null) return false;
|
if (entry == null) return false;
|
||||||
@@ -167,42 +178,46 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void exists(final CompletionHandler<Boolean, Serializable> handler, @DynAttachment final Serializable key) {
|
public void exists(final CompletionHandler<Boolean, K> handler, @DynAttachment final K key) {
|
||||||
if (handler != null) handler.completed(exists(key), key);
|
if (handler != null) handler.completed(exists(key), key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> T get(Serializable key) {
|
public V get(K key) {
|
||||||
if (key == null) return null;
|
if (key == null) return null;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null || entry.isExpired()) return null;
|
if (entry == null || entry.isExpired() || entry.value == null) return null;
|
||||||
return (T) entry.getValue();
|
if (entry.isListCacheType()) return (V) new ArrayList((Collection) entry.value);
|
||||||
|
if (entry.isSetCacheType()) return (V) new HashSet((Collection) entry.value);
|
||||||
|
return (V) entry.getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void get(final CompletionHandler<T, Serializable> handler, @DynAttachment final Serializable key) {
|
public void get(final CompletionHandler<V, K> handler, @DynAttachment final K key) {
|
||||||
if (handler != null) handler.completed(get(key), key);
|
if (handler != null) handler.completed(get(key), key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public <T> T getAndRefresh(Serializable key) {
|
public V getAndRefresh(K key) {
|
||||||
if (key == null) return null;
|
if (key == null) return null;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null) return null;
|
if (entry == null || entry.isExpired() || entry.value == null) return null;
|
||||||
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
|
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
|
||||||
return (T) entry.getValue();
|
if (entry.isListCacheType()) return (V) new ArrayList((Collection) entry.value);
|
||||||
|
if (entry.isSetCacheType()) return (V) new HashSet((Collection) entry.value);
|
||||||
|
return (V) entry.getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void getAndRefresh(final CompletionHandler<T, Serializable> handler, @DynAttachment final Serializable key) {
|
public void getAndRefresh(final CompletionHandler<V, K> handler, @DynAttachment final K key) {
|
||||||
T rs = getAndRefresh(key);
|
V rs = getAndRefresh(key);
|
||||||
if (handler != null) handler.completed(rs, key);
|
if (handler != null) handler.completed(rs, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public void refresh(Serializable key) {
|
public void refresh(K key) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null) return;
|
if (entry == null) return;
|
||||||
@@ -210,18 +225,18 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void refresh(final CompletionHandler<Void, Serializable> handler, final Serializable key) {
|
public void refresh(final CompletionHandler<Void, K> handler, final K key) {
|
||||||
refresh(key);
|
refresh(key);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public <T> void set(Serializable key, T value) {
|
public void set(K key, V value) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
entry = new CacheEntry(key, value);
|
entry = new CacheEntry(CacheEntryType.OBJECT, key, value);
|
||||||
container.putIfAbsent(key, entry);
|
container.putIfAbsent(key, entry);
|
||||||
} else {
|
} else {
|
||||||
entry.expireSeconds = 0;
|
entry.expireSeconds = 0;
|
||||||
@@ -231,18 +246,18 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void set(final CompletionHandler<Void, Serializable> handler, @DynAttachment final Serializable key, final T value) {
|
public void set(final CompletionHandler<Void, K> handler, @DynAttachment final K key, final V value) {
|
||||||
set(key, value);
|
set(key, value);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public <T> void set(int expireSeconds, Serializable key, T value) {
|
public void set(int expireSeconds, K key, V value) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
entry = new CacheEntry(expireSeconds, key, value);
|
entry = new CacheEntry(CacheEntryType.OBJECT, expireSeconds, key, value);
|
||||||
container.putIfAbsent(key, entry);
|
container.putIfAbsent(key, entry);
|
||||||
} else {
|
} else {
|
||||||
if (expireSeconds > 0) entry.expireSeconds = expireSeconds;
|
if (expireSeconds > 0) entry.expireSeconds = expireSeconds;
|
||||||
@@ -252,14 +267,14 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void set(final CompletionHandler<Void, Serializable> handler, final int expireSeconds, @DynAttachment final Serializable key, final T value) {
|
public void set(final CompletionHandler<Void, K> handler, final int expireSeconds, @DynAttachment final K key, final V value) {
|
||||||
set(expireSeconds, key, value);
|
set(expireSeconds, key, value);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public void setExpireSeconds(Serializable key, int expireSeconds) {
|
public void setExpireSeconds(K key, int expireSeconds) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null) return;
|
if (entry == null) return;
|
||||||
@@ -267,86 +282,106 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setExpireSeconds(final CompletionHandler<Void, Serializable> handler, @DynAttachment final Serializable key, final int expireSeconds) {
|
public void setExpireSeconds(final CompletionHandler<Void, K> handler, @DynAttachment final K key, final int expireSeconds) {
|
||||||
setExpireSeconds(key, expireSeconds);
|
setExpireSeconds(key, expireSeconds);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public void remove(Serializable key) {
|
public void remove(K key) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
container.remove(key);
|
container.remove(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void remove(final CompletionHandler<Void, Serializable> handler, @DynAttachment final Serializable key) {
|
public void remove(final CompletionHandler<Void, K> handler, @DynAttachment final K key) {
|
||||||
remove(key);
|
remove(key);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<V> getCollection(final K key) {
|
||||||
|
return (Collection<V>) get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void getCollection(final CompletionHandler<Collection<V>, K> handler, @DynAttachment final K key) {
|
||||||
|
if (handler != null) handler.completed(getCollection(key), key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<V> getCollectionAndRefresh(final K key) {
|
||||||
|
return (Collection<V>) getAndRefresh(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void getCollectionAndRefresh(final CompletionHandler<Collection<V>, K> handler, @DynAttachment final K key) {
|
||||||
|
if (handler != null) handler.completed(getCollectionAndRefresh(key), key);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public <V> void appendListItem(Serializable key, V value) {
|
public void appendListItem(K key, V value) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null || !(entry.value instanceof List)) {
|
if (entry == null || !entry.isListCacheType()) {
|
||||||
List<V> list = new CopyOnWriteArrayList<>();
|
Collection<V> list = new ConcurrentLinkedQueue<>();
|
||||||
entry = new CacheEntry(key, list);
|
entry = new CacheEntry(CacheEntryType.LIST, key, list);
|
||||||
CacheEntry old = container.putIfAbsent(key, entry);
|
CacheEntry old = container.putIfAbsent(key, entry);
|
||||||
if (old != null) list = (List) old.value;
|
if (old != null) list = (Collection) old.value;
|
||||||
list.add(value);
|
list.add(value);
|
||||||
} else {
|
} else {
|
||||||
((List) entry.getValue()).add(value);
|
((Collection) entry.getValue()).add(value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void appendListItem(final CompletionHandler<Void, Serializable> handler, @DynAttachment final Serializable key, final T value) {
|
public void appendListItem(final CompletionHandler<Void, K> handler, @DynAttachment final K key, final V value) {
|
||||||
appendListItem(key, value);
|
appendListItem(key, value);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public <V> void removeListItem(Serializable key, V value) {
|
public void removeListItem(K key, V value) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null || !(entry.value instanceof List)) return;
|
if (entry == null || !entry.isListCacheType()) return;
|
||||||
((List) entry.getValue()).remove(value);
|
((Collection) entry.getValue()).remove(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void removeListItem(final CompletionHandler<Void, Serializable> handler, @DynAttachment final Serializable key, final T value) {
|
public void removeListItem(final CompletionHandler<Void, K> handler, @DynAttachment final K key, final V value) {
|
||||||
removeListItem(key, value);
|
removeListItem(key, value);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public <V> void appendSetItem(Serializable key, V value) {
|
public void appendSetItem(K key, V value) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null || !(entry.value instanceof Set)) {
|
if (entry == null || !entry.isSetCacheType()) {
|
||||||
Set<V> set = new CopyOnWriteArraySet();
|
Collection<V> set = new CopyOnWriteArraySet();
|
||||||
entry = new CacheEntry(key, set);
|
entry = new CacheEntry(CacheEntryType.SET, key, set);
|
||||||
CacheEntry old = container.putIfAbsent(key, entry);
|
CacheEntry old = container.putIfAbsent(key, entry);
|
||||||
if (old != null) set = (Set) old.value;
|
if (old != null) set = (Collection) old.value;
|
||||||
set.add(value);
|
set.add(value);
|
||||||
} else {
|
} else {
|
||||||
((Set) entry.getValue()).add(value);
|
((Collection) entry.getValue()).add(value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void appendSetItem(final CompletionHandler<Void, Serializable> handler, @DynAttachment final Serializable key, final T value) {
|
public void appendSetItem(final CompletionHandler<Void, K> handler, @DynAttachment final K key, final V value) {
|
||||||
appendSetItem(key, value);
|
appendSetItem(key, value);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@MultiRun
|
@MultiRun
|
||||||
public <V> void removeSetItem(Serializable key, V value) {
|
public void removeSetItem(K key, V value) {
|
||||||
if (key == null) return;
|
if (key == null) return;
|
||||||
CacheEntry entry = container.get(key);
|
CacheEntry entry = container.get(key);
|
||||||
if (entry == null || !(entry.value instanceof Set)) return;
|
if (entry == null || !(entry.value instanceof Set)) return;
|
||||||
@@ -354,24 +389,32 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void removeSetItem(final CompletionHandler<Void, Serializable> handler, @DynAttachment final Serializable key, final T value) {
|
public void removeSetItem(final CompletionHandler<Void, K> handler, @DynAttachment final K key, final V value) {
|
||||||
removeSetItem(key, value);
|
removeSetItem(key, value);
|
||||||
if (handler != null) handler.completed(null, key);
|
if (handler != null) handler.completed(null, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static enum CacheEntryType {
|
||||||
|
OBJECT, SET, LIST;
|
||||||
|
}
|
||||||
|
|
||||||
public static final class CacheEntry<K extends Serializable, T> {
|
public static final class CacheEntry<K extends Serializable, T> {
|
||||||
|
|
||||||
|
public static final String JSON_SET_KEY = "\"cacheType\":\"" + CacheEntryType.SET + "\"";
|
||||||
|
|
||||||
|
public static final String JSON_LIST_KEY = "\"cacheType\":\"" + CacheEntryType.LIST + "\"";
|
||||||
|
|
||||||
public static class CacheEntryCreator implements Creator<CacheEntry> {
|
public static class CacheEntryCreator implements Creator<CacheEntry> {
|
||||||
|
|
||||||
public static final CacheEntryCreator CREATOR = new CacheEntryCreator();
|
public static final CacheEntryCreator CREATOR = new CacheEntryCreator();
|
||||||
|
|
||||||
@java.beans.ConstructorProperties({"expireSeconds", "lastAccessed", "key", "value"})
|
@java.beans.ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "value"})
|
||||||
public CacheEntryCreator() {
|
public CacheEntryCreator() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CacheEntry create(Object... params) {
|
public CacheEntry create(Object... params) {
|
||||||
return new CacheEntry((Integer) params[0], (Integer) params[1], (Serializable) params[2], params[3]);
|
return new CacheEntry((CacheEntryType) params[0], (Integer) params[1], (Integer) params[2], (Serializable) params[3], params[4]);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -382,6 +425,8 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final CacheEntryType cacheType;
|
||||||
|
|
||||||
private final K key;
|
private final K key;
|
||||||
|
|
||||||
//<=0表示永久保存
|
//<=0表示永久保存
|
||||||
@@ -391,16 +436,17 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
|
|
||||||
private T value;
|
private T value;
|
||||||
|
|
||||||
public CacheEntry(K key, T value) {
|
public CacheEntry(CacheEntryType cacheType, K key, T value) {
|
||||||
this(0, key, value);
|
this(cacheType, 0, key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CacheEntry(int expireSeconds, K key, T value) {
|
public CacheEntry(CacheEntryType cacheType, int expireSeconds, K key, T value) {
|
||||||
this(expireSeconds, (int) (System.currentTimeMillis() / 1000), key, value);
|
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@java.beans.ConstructorProperties({"expireSeconds", "lastAccessed", "key", "value"})
|
@java.beans.ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "value"})
|
||||||
private CacheEntry(int expireSeconds, int lastAccessed, K key, T value) {
|
private CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, K key, T value) {
|
||||||
|
this.cacheType = cacheType;
|
||||||
this.expireSeconds = expireSeconds;
|
this.expireSeconds = expireSeconds;
|
||||||
this.lastAccessed = lastAccessed;
|
this.lastAccessed = lastAccessed;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
@@ -412,6 +458,20 @@ public class CacheSourceService implements CacheSource, Service, AutoCloseable {
|
|||||||
return JsonFactory.root().getConvert().convertTo(this);
|
return JsonFactory.root().getConvert().convertTo(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CacheEntryType getCacheType() {
|
||||||
|
return cacheType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
public boolean isListCacheType() {
|
||||||
|
return cacheType == CacheEntryType.LIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
public boolean isSetCacheType() {
|
||||||
|
return cacheType == CacheEntryType.SET;
|
||||||
|
}
|
||||||
|
|
||||||
@Ignore
|
@Ignore
|
||||||
public boolean isExpired() {
|
public boolean isExpired() {
|
||||||
return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000));
|
return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000));
|
||||||
|
|||||||
@@ -7,66 +7,77 @@ package org.redkale.source;
|
|||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.nio.channels.*;
|
import java.nio.channels.*;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
* @param <K>
|
||||||
|
* @param <V>
|
||||||
* @see http://www.redkale.org
|
* @see http://www.redkale.org
|
||||||
* @author zhangjx
|
* @author zhangjx
|
||||||
*/
|
*/
|
||||||
public interface CacheSource {
|
public interface CacheSource<K extends Serializable, V> {
|
||||||
|
|
||||||
default boolean isOpen() {
|
default boolean isOpen() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean exists(final Serializable key);
|
public boolean exists(final K key);
|
||||||
|
|
||||||
public <T> T get(final Serializable key);
|
public V get(final K key);
|
||||||
|
|
||||||
public <T> T getAndRefresh(final Serializable key);
|
public V getAndRefresh(final K key);
|
||||||
|
|
||||||
public void refresh(final Serializable key);
|
public void refresh(final K key);
|
||||||
|
|
||||||
public <T> void set(final Serializable key, final T value);
|
public void set(final K key, final V value);
|
||||||
|
|
||||||
public <T> void set(final int expireSeconds, final Serializable key, final T value);
|
public void set(final int expireSeconds, final K key, final V value);
|
||||||
|
|
||||||
public void setExpireSeconds(final Serializable key, final int expireSeconds);
|
public void setExpireSeconds(final K key, final int expireSeconds);
|
||||||
|
|
||||||
public void remove(final Serializable key);
|
public void remove(final K key);
|
||||||
|
|
||||||
public <T> void appendListItem(final Serializable key, final T value);
|
public Collection<V> getCollection(final K key);
|
||||||
|
|
||||||
public <T> void removeListItem(final Serializable key, final T value);
|
public Collection<V> getCollectionAndRefresh(final K key);
|
||||||
|
|
||||||
public <T> void appendSetItem(final Serializable key, final T value);
|
public void appendListItem(final K key, final V value);
|
||||||
|
|
||||||
public <T> void removeSetItem(final Serializable key, final T value);
|
public void removeListItem(final K key, final V value);
|
||||||
|
|
||||||
|
public void appendSetItem(final K key, final V value);
|
||||||
|
|
||||||
|
public void removeSetItem(final K key, final V value);
|
||||||
|
|
||||||
//----------------------异步版---------------------------------
|
//----------------------异步版---------------------------------
|
||||||
public void exists(final CompletionHandler<Boolean, Serializable> handler, final Serializable key);
|
public void exists(final CompletionHandler<Boolean, K> handler, final K key);
|
||||||
|
|
||||||
public <T> void get(final CompletionHandler<T, Serializable> handler, final Serializable key);
|
public void get(final CompletionHandler<V, K> handler, final K key);
|
||||||
|
|
||||||
public <T> void getAndRefresh(final CompletionHandler<T, Serializable> handler, final Serializable key);
|
public void getAndRefresh(final CompletionHandler<V, K> handler, final K key);
|
||||||
|
|
||||||
public <T> void refresh(final CompletionHandler<Void, Serializable> handler, final Serializable key);
|
public void refresh(final CompletionHandler<Void, K> handler, final K key);
|
||||||
|
|
||||||
public <T> void set(final CompletionHandler<Void, Serializable> handler, final Serializable key, final T value);
|
public void set(final CompletionHandler<Void, K> handler, final K key, final V value);
|
||||||
|
|
||||||
public <T> void set(final CompletionHandler<Void, Serializable> handler, final int expireSeconds, final Serializable key, final T value);
|
public void set(final CompletionHandler<Void, K> handler, final int expireSeconds, final K key, final V value);
|
||||||
|
|
||||||
public void setExpireSeconds(final CompletionHandler<Void, Serializable> handler, final Serializable key, final int expireSeconds);
|
public void setExpireSeconds(final CompletionHandler<Void, K> handler, final K key, final int expireSeconds);
|
||||||
|
|
||||||
public void remove(final CompletionHandler<Void, Serializable> handler, final Serializable key);
|
public void remove(final CompletionHandler<Void, K> handler, final K key);
|
||||||
|
|
||||||
public <T> void appendListItem(final CompletionHandler<Void, Serializable> handler, final Serializable key, final T value);
|
public void getCollection(final CompletionHandler<Collection<V>, K> handler, final K key);
|
||||||
|
|
||||||
public <T> void removeListItem(final CompletionHandler<Void, Serializable> handler, final Serializable key, final T value);
|
public void getCollectionAndRefresh(final CompletionHandler<Collection<V>, K> handler, final K key);
|
||||||
|
|
||||||
public <T> void appendSetItem(final CompletionHandler<Void, Serializable> handler, final Serializable key, final T value);
|
public void appendListItem(final CompletionHandler<Void, K> handler, final K key, final V value);
|
||||||
|
|
||||||
public <T> void removeSetItem(final CompletionHandler<Void, Serializable> handler, final Serializable key, final T value);
|
public void removeListItem(final CompletionHandler<Void, K> handler, final K key, final V value);
|
||||||
|
|
||||||
|
public void appendSetItem(final CompletionHandler<Void, K> handler, final K key, final V value);
|
||||||
|
|
||||||
|
public void removeSetItem(final CompletionHandler<Void, K> handler, final K key, final V value);
|
||||||
|
|
||||||
default void isOpen(final CompletionHandler<Boolean, Void> handler) {
|
default void isOpen(final CompletionHandler<Boolean, Void> handler) {
|
||||||
if (handler != null) handler.completed(Boolean.TRUE, null);
|
if (handler != null) handler.completed(Boolean.TRUE, null);
|
||||||
|
|||||||
@@ -1,32 +0,0 @@
|
|||||||
/*
|
|
||||||
* To change this license header, choose License Headers in Project Properties.
|
|
||||||
* To change this template file, choose Tools | Templates
|
|
||||||
* and open the template in the editor.
|
|
||||||
*/
|
|
||||||
package org.redkale.source;
|
|
||||||
|
|
||||||
import java.lang.annotation.*;
|
|
||||||
import static java.lang.annotation.ElementType.*;
|
|
||||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 用于标记CacheSource 是否需要持久化到文件中
|
|
||||||
* 注意: 标记为@CacheStore 的 CacheSource对的name()不能包含特殊字符, 否则无法创建存储文件。
|
|
||||||
*
|
|
||||||
* @see http://www.redkale.org
|
|
||||||
* @author zhangjx
|
|
||||||
*/
|
|
||||||
@Target({FIELD})
|
|
||||||
@Retention(RUNTIME)
|
|
||||||
public @interface CacheStore {
|
|
||||||
|
|
||||||
public static enum CacheEntryType {
|
|
||||||
OBJECT, SET, LIST;
|
|
||||||
}
|
|
||||||
|
|
||||||
Class keyType(); //key对应的class
|
|
||||||
|
|
||||||
Class valueType(); //value 对应的class
|
|
||||||
|
|
||||||
CacheEntryType entryType() default CacheEntryType.OBJECT;
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user