CacheSource增加getString/getLong一系列方法
This commit is contained in:
@@ -13,6 +13,7 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Stream;
|
||||
import org.redkale.convert.ext.InetAddressSimpledCoder.InetSocketAddressSimpledCoder;
|
||||
@@ -88,6 +89,7 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
|
||||
this.register(String.class, StringSimpledCoder.instance);
|
||||
this.register(CharSequence.class, CharSequenceSimpledCoder.instance);
|
||||
this.register(java.util.Date.class, DateSimpledCoder.instance);
|
||||
this.register(AtomicLong.class, AtomicLongSimpledCoder.instance);
|
||||
this.register(BigInteger.class, BigIntegerSimpledCoder.instance);
|
||||
this.register(BigDecimal.class, BigDecimalSimpledCoder.instance);
|
||||
this.register(InetAddress.class, InetAddressSimpledCoder.instance);
|
||||
|
||||
35
src/org/redkale/convert/ext/AtomicLongSimpledCoder.java
Normal file
35
src/org/redkale/convert/ext/AtomicLongSimpledCoder.java
Normal file
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.convert.ext;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.redkale.convert.*;
|
||||
|
||||
/**
|
||||
* AtomicLong 的SimpledCoder实现
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
* @param <R> Reader输入的子类型
|
||||
* @param <W> Writer输出的子类型
|
||||
*/
|
||||
public final class AtomicLongSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, AtomicLong> {
|
||||
|
||||
public static final AtomicLongSimpledCoder instance = new AtomicLongSimpledCoder();
|
||||
|
||||
@Override
|
||||
public void convertTo(W out, AtomicLong value) {
|
||||
out.writeLong(value == null ? 0 : value.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AtomicLong convertFrom(R in) {
|
||||
return new AtomicLong(in.readLong());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -32,6 +32,15 @@ import org.redkale.util.*;
|
||||
@ResourceType(CacheSource.class)
|
||||
public class CacheMemorySource<V extends Object> extends AbstractService implements CacheSource<V>, Service, AutoCloseable, Resourcable {
|
||||
|
||||
private static final Type STRING_ENTRY_TYPE = new TypeToken<CacheEntry<String>>() {
|
||||
}.getType();
|
||||
|
||||
private static final Type LONG_ENTRY_TYPE = new TypeToken<CacheEntry<Long>>() {
|
||||
}.getType();
|
||||
|
||||
private static final Type ATOMIC_ENTRY_TYPE = new TypeToken<CacheEntry<AtomicLong>>() {
|
||||
}.getType();
|
||||
|
||||
@Resource(name = "APP_HOME")
|
||||
private File home;
|
||||
|
||||
@@ -45,10 +54,6 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
|
||||
private Type objValueType;
|
||||
|
||||
private Type setValueType;
|
||||
|
||||
private Type listValueType;
|
||||
|
||||
private ScheduledThreadPoolExecutor scheduler;
|
||||
|
||||
private Consumer<CacheEntry> expireHandler;
|
||||
@@ -66,8 +71,6 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
@Override
|
||||
public final void initValueType(Type valueType) {
|
||||
this.objValueType = valueType;
|
||||
this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, valueType);
|
||||
this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, valueType);
|
||||
this.initTransient(this.objValueType == null);
|
||||
}
|
||||
|
||||
@@ -84,6 +87,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
@Override
|
||||
public void init(AnyValue conf) {
|
||||
if (this.convert == null) this.convert = this.defaultConvert;
|
||||
if (this.convert == null) this.convert = JsonConvert.root();
|
||||
final CacheMemorySource self = this;
|
||||
AnyValue prop = conf == null ? null : conf.getAnyValue("properties");
|
||||
if (prop != null) {
|
||||
@@ -137,18 +141,21 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
File store = new File(home, "cache/" + resourceName());
|
||||
if (!store.isFile() || !store.canRead()) return;
|
||||
LineNumberReader reader = new LineNumberReader(new FileReader(store));
|
||||
if (this.objValueType == null) {
|
||||
this.objValueType = Object.class;
|
||||
this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, this.objValueType);
|
||||
this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, this.objValueType);
|
||||
}
|
||||
if (this.objValueType == null) this.objValueType = Object.class;
|
||||
final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType);
|
||||
final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, setValueType);
|
||||
final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, listValueType);
|
||||
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if (line.isEmpty()) continue;
|
||||
CacheEntry<Object> entry = convert.convertFrom(line.startsWith(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.startsWith(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line);
|
||||
Type convertType = storeObjType;
|
||||
if (line.startsWith("{\"cacheType\":\"" + CacheEntryType.LONG)) {
|
||||
convertType = LONG_ENTRY_TYPE;
|
||||
} else if (line.startsWith("{\"cacheType\":\"" + CacheEntryType.STRING)) {
|
||||
convertType = STRING_ENTRY_TYPE;
|
||||
} else if (line.startsWith("{\"cacheType\":\"" + CacheEntryType.ATOMIC)) {
|
||||
convertType = ATOMIC_ENTRY_TYPE;
|
||||
}
|
||||
CacheEntry<Object> entry = convert.convertFrom(convertType, line);
|
||||
if (entry.isExpired()) continue;
|
||||
if (datasync && container.containsKey(entry.key)) continue; //已经同步了
|
||||
container.put(entry.key, entry);
|
||||
@@ -182,6 +189,54 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
|
||||
conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "127.0.0.1").addValue("port", "6379"));
|
||||
|
||||
CacheMemorySource source = new CacheMemorySource();
|
||||
source.defaultConvert = JsonFactory.root().getConvert();
|
||||
source.initValueType(String.class); //value用String类型
|
||||
source.init(conf);
|
||||
|
||||
System.out.println("------------------------------------");
|
||||
source.remove("key1");
|
||||
source.remove("key2");
|
||||
source.remove("300");
|
||||
source.set("key1", "value1");
|
||||
source.set("300", "4000");
|
||||
source.getAndRefresh("key1", 3500);
|
||||
System.out.println("[有值] 300 GET : " + source.get("300"));
|
||||
System.out.println("[有值] key1 GET : " + source.get("key1"));
|
||||
System.out.println("[无值] key2 GET : " + source.get("key2"));
|
||||
System.out.println("[有值] key1 EXISTS : " + source.exists("key1"));
|
||||
System.out.println("[无值] key2 EXISTS : " + source.exists("key2"));
|
||||
|
||||
source.remove("keys3");
|
||||
source.appendListItem("keys3", "vals1");
|
||||
source.appendListItem("keys3", "vals2");
|
||||
System.out.println("-------- keys3 追加了两个值 --------");
|
||||
System.out.println("[两值] keys3 VALUES : " + source.getCollection("keys3"));
|
||||
System.out.println("[有值] keys3 EXISTS : " + source.exists("keys3"));
|
||||
source.removeListItem("keys3", "vals1");
|
||||
System.out.println("[一值] keys3 VALUES : " + source.getCollection("keys3"));
|
||||
source.getCollectionAndRefresh("keys3", 3000);
|
||||
|
||||
source.remove("sets3");
|
||||
source.appendSetItem("sets3", "setvals1");
|
||||
source.appendSetItem("sets3", "setvals2");
|
||||
source.appendSetItem("sets3", "setvals1");
|
||||
System.out.println("[两值] sets3 VALUES : " + source.getCollection("sets3"));
|
||||
System.out.println("[有值] sets3 EXISTS : " + source.exists("sets3"));
|
||||
source.removeSetItem("sets3", "setvals1");
|
||||
System.out.println("[一值] sets3 VALUES : " + source.getCollection("sets3"));
|
||||
System.out.println("sets3 大小 : " + source.getCollectionSize("sets3"));
|
||||
System.out.println("all keys: " + source.queryKeys());
|
||||
System.out.println("newnum 值 : " + source.incr("newnum"));
|
||||
System.out.println("newnum 值 : " + source.decr("newnum"));
|
||||
System.out.println("------------------------------------");
|
||||
source.destroy(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception { //给Application 关闭时调用
|
||||
destroy(null);
|
||||
@@ -202,11 +257,38 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
store.getParentFile().mkdirs();
|
||||
PrintStream stream = new PrintStream(store, "UTF-8");
|
||||
final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType);
|
||||
final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, setValueType);
|
||||
final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, listValueType);
|
||||
final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType);
|
||||
final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, objValueType);
|
||||
Collection<CacheEntry<Object>> entrys = container.values();
|
||||
for (CacheEntry entry : entrys) {
|
||||
stream.println(convert.convertTo(entry.isSetCacheType() ? storeSetType : (entry.isListCacheType() ? storeListType : storeObjType), entry));
|
||||
Type convertType = storeObjType;
|
||||
if (entry.cacheType == CacheEntryType.LONG) {
|
||||
convertType = LONG_ENTRY_TYPE;
|
||||
} else if (entry.cacheType == CacheEntryType.STRING) {
|
||||
convertType = STRING_ENTRY_TYPE;
|
||||
} else if (entry.cacheType == CacheEntryType.ATOMIC) {
|
||||
convertType = ATOMIC_ENTRY_TYPE;
|
||||
} else if (entry.cacheType == CacheEntryType.OBJECT) {
|
||||
convertType = storeObjType;
|
||||
} else if (entry.cacheType == CacheEntryType.LONG_LIST) {
|
||||
convertType = LONG_ENTRY_TYPE;
|
||||
} else if (entry.cacheType == CacheEntryType.LONG_SET) {
|
||||
convertType = LONG_ENTRY_TYPE;
|
||||
} else if (entry.cacheType == CacheEntryType.STRING_LIST) {
|
||||
convertType = STRING_ENTRY_TYPE;
|
||||
} else if (entry.cacheType == CacheEntryType.STRING_SET) {
|
||||
convertType = STRING_ENTRY_TYPE;
|
||||
} else if (entry.cacheType == CacheEntryType.OBJECT_LIST) {
|
||||
convertType = storeListType;
|
||||
} else if (entry.cacheType == CacheEntryType.OBJECT_SET) {
|
||||
convertType = storeSetType;
|
||||
}
|
||||
try {
|
||||
stream.println(convert.convertTo(convertType, entry));
|
||||
} catch (Exception ee) {
|
||||
System.err.println(storeSetType + "-----" + entry);
|
||||
throw ee;
|
||||
}
|
||||
}
|
||||
container.clear();
|
||||
stream.close();
|
||||
@@ -234,15 +316,41 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.isExpired()) return null;
|
||||
if (entry.isListCacheType()) return (V) (entry.listValue == null ? null : new ArrayList(entry.listValue));
|
||||
if (entry.isSetCacheType()) return (V) (entry.setValue == null ? null : new HashSet(entry.setValue));
|
||||
if (entry.isSetCacheType()) return (V) (entry.csetValue == null ? null : new HashSet(entry.csetValue));
|
||||
return (V) entry.objectValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getString(String key) {
|
||||
if (key == null) return null;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.isExpired()) return null;
|
||||
return (String) entry.objectValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong(String key, long defValue) {
|
||||
if (key == null) return defValue;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.isExpired()) return defValue;
|
||||
return entry.objectValue == null ? defValue : (entry.objectValue instanceof AtomicLong ? ((AtomicLong) entry.objectValue).get() : (Long) entry.objectValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<V> getAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> get(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<String> getStringAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> getString(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Long> getLongAsync(final String key, long defValue) {
|
||||
return CompletableFuture.supplyAsync(() -> getLong(key, defValue), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public V getAndRefresh(String key, final int expireSeconds) {
|
||||
@@ -252,15 +360,51 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
|
||||
entry.expireSeconds = expireSeconds;
|
||||
if (entry.isListCacheType()) return (V) (entry.listValue == null ? null : new ArrayList(entry.listValue));
|
||||
if (entry.isSetCacheType()) return (V) (entry.setValue == null ? null : new HashSet(entry.setValue));
|
||||
if (entry.isSetCacheType()) return (V) (entry.csetValue == null ? null : new HashSet(entry.csetValue));
|
||||
return (V) entry.objectValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public String getStringAndRefresh(String key, final int expireSeconds) {
|
||||
if (key == null) return null;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.isExpired()) return null;
|
||||
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
|
||||
entry.expireSeconds = expireSeconds;
|
||||
return (String) entry.objectValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public long getLongAndRefresh(String key, final int expireSeconds, long defValue) {
|
||||
if (key == null) return defValue;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.isExpired()) return defValue;
|
||||
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
|
||||
entry.expireSeconds = expireSeconds;
|
||||
return entry.objectValue == null ? defValue : (entry.objectValue instanceof AtomicLong ? ((AtomicLong) entry.objectValue).get() : (Long) entry.objectValue);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<V> getAndRefreshAsync(final String key, final int expireSeconds) {
|
||||
return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<String> getStringAndRefreshAsync(final String key, final int expireSeconds) {
|
||||
return CompletableFuture.supplyAsync(() -> getStringAndRefresh(key, expireSeconds), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Long> getLongAndRefreshAsync(final String key, final int expireSeconds, long defValue) {
|
||||
return CompletableFuture.supplyAsync(() -> getLongAndRefresh(key, expireSeconds, defValue), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void refresh(String key, final int expireSeconds) {
|
||||
@@ -272,17 +416,16 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> refreshAsync(final String key, final int expireSeconds) {
|
||||
return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void set(String key, V value) {
|
||||
protected void set(CacheEntryType cacheType, String key, Object value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null) {
|
||||
entry = new CacheEntry(CacheEntryType.OBJECT, key, value, null, null);
|
||||
entry = new CacheEntry(cacheType, key, value, null, null);
|
||||
container.putIfAbsent(key, entry);
|
||||
} else {
|
||||
entry.expireSeconds = 0;
|
||||
@@ -292,17 +435,46 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void set(String key, V value) {
|
||||
set(CacheEntryType.OBJECT, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void setString(String key, String value) {
|
||||
set(CacheEntryType.STRING, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void setLong(String key, long value) {
|
||||
set(CacheEntryType.LONG, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> setAsync(String key, V value) {
|
||||
return CompletableFuture.runAsync(() -> set(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void set(int expireSeconds, String key, V value) {
|
||||
public CompletableFuture<Void> setStringAsync(String key, String value) {
|
||||
return CompletableFuture.runAsync(() -> setString(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> setLongAsync(String key, long value) {
|
||||
return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor());
|
||||
}
|
||||
|
||||
protected void set(CacheEntryType cacheType, int expireSeconds, String key, Object value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null) {
|
||||
entry = new CacheEntry(CacheEntryType.OBJECT, expireSeconds, key, value, null, null);
|
||||
entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null);
|
||||
container.putIfAbsent(key, entry);
|
||||
} else {
|
||||
if (expireSeconds > 0) entry.expireSeconds = expireSeconds;
|
||||
@@ -312,10 +484,41 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void set(int expireSeconds, String key, V value) {
|
||||
set(CacheEntryType.OBJECT, expireSeconds, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void setString(int expireSeconds, String key, String value) {
|
||||
set(CacheEntryType.STRING, expireSeconds, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void setLong(int expireSeconds, String key, long value) {
|
||||
set(CacheEntryType.LONG, expireSeconds, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> setAsync(int expireSeconds, String key, V value) {
|
||||
return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> setStringAsync(int expireSeconds, String key, String value) {
|
||||
return CompletableFuture.runAsync(() -> setString(expireSeconds, key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> setLongAsync(int expireSeconds, String key, long value) {
|
||||
return CompletableFuture.runAsync(() -> setLong(expireSeconds, key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void setExpireSeconds(String key, int expireSeconds) {
|
||||
@@ -326,6 +529,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> setExpireSecondsAsync(final String key, final int expireSeconds) {
|
||||
return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor());
|
||||
}
|
||||
@@ -338,23 +542,26 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public long incr(final String key) {
|
||||
return incr(key, 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Long> incrAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> incr(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public long incr(final String key, long num) {
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null) {
|
||||
synchronized (container) {
|
||||
entry = container.get(key);
|
||||
if (entry == null) {
|
||||
entry = new CacheEntry(CacheEntryType.OBJECT, key, new AtomicLong(), null, null);
|
||||
entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null);
|
||||
container.put(key, entry);
|
||||
}
|
||||
}
|
||||
@@ -363,31 +570,37 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Long> incrAsync(final String key, long num) {
|
||||
return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public long decr(final String key) {
|
||||
return incr(key, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Long> decrAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> decr(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public long decr(final String key, long num) {
|
||||
return incr(key, -num);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Long> decrAsync(final String key, long num) {
|
||||
return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> removeAsync(final String key) {
|
||||
return CompletableFuture.runAsync(() -> remove(key), getExecutor());
|
||||
}
|
||||
@@ -397,11 +610,31 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return (Collection<V>) get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getStringCollection(final String key) {
|
||||
return (Collection<String>) get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Long> getLongCollection(final String key) {
|
||||
return (Collection<Long>) get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Collection<V>> getCollectionAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Collection<String>> getStringCollectionAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> getStringCollection(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Collection<Long>> getLongCollectionAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> getLongCollection(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCollectionSize(final String key) {
|
||||
Collection<V> collection = (Collection<V>) get(key);
|
||||
@@ -414,23 +647,47 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public Collection<V> getCollectionAndRefresh(final String key, final int expireSeconds) {
|
||||
return (Collection<V>) getAndRefresh(key, expireSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public Collection<String> getStringCollectionAndRefresh(final String key, final int expireSeconds) {
|
||||
return (Collection<String>) getAndRefresh(key, expireSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public Collection<Long> getLongCollectionAndRefresh(final String key, final int expireSeconds) {
|
||||
return (Collection<Long>) getAndRefresh(key, expireSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final String key, final int expireSeconds) {
|
||||
return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendListItem(String key, V value) {
|
||||
public CompletableFuture<Collection<String>> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds) {
|
||||
return CompletableFuture.supplyAsync(() -> getStringCollectionAndRefresh(key, expireSeconds), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Collection<Long>> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds) {
|
||||
return CompletableFuture.supplyAsync(() -> getLongCollectionAndRefresh(key, expireSeconds), getExecutor());
|
||||
}
|
||||
|
||||
protected void appendListItem(CacheEntryType cacheType, String key, Object value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || !entry.isListCacheType() || entry.listValue == null) {
|
||||
ConcurrentLinkedQueue<V> list = new ConcurrentLinkedQueue();
|
||||
entry = new CacheEntry(CacheEntryType.LIST, key, null, null, list);
|
||||
ConcurrentLinkedQueue list = new ConcurrentLinkedQueue();
|
||||
entry = new CacheEntry(cacheType, key, null, null, list);
|
||||
CacheEntry old = container.putIfAbsent(key, entry);
|
||||
if (old != null) list = old.listValue;
|
||||
if (list != null) list.add(value);
|
||||
@@ -440,10 +697,41 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendListItem(String key, V value) {
|
||||
appendListItem(CacheEntryType.OBJECT_LIST, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendStringListItem(String key, String value) {
|
||||
appendListItem(CacheEntryType.STRING_LIST, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendLongListItem(String key, long value) {
|
||||
appendListItem(CacheEntryType.LONG_LIST, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> appendListItemAsync(final String key, final V value) {
|
||||
return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> appendStringListItemAsync(final String key, final String value) {
|
||||
return CompletableFuture.runAsync(() -> appendStringListItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> appendLongListItemAsync(final String key, final long value) {
|
||||
return CompletableFuture.runAsync(() -> appendLongListItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeListItem(String key, V value) {
|
||||
@@ -454,45 +742,136 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeStringListItem(String key, String value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.listValue == null) return;
|
||||
entry.listValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeLongListItem(String key, long value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.listValue == null) return;
|
||||
entry.listValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> removeListItemAsync(final String key, final V value) {
|
||||
return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendSetItem(String key, V value) {
|
||||
public CompletableFuture<Void> removeStringListItemAsync(final String key, final String value) {
|
||||
return CompletableFuture.runAsync(() -> removeStringListItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> removeLongListItemAsync(final String key, final long value) {
|
||||
return CompletableFuture.runAsync(() -> removeLongListItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
protected void appendSetItem(CacheEntryType cacheType, String key, Object value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || !entry.isSetCacheType() || entry.setValue == null) {
|
||||
CopyOnWriteArraySet<V> set = new CopyOnWriteArraySet();
|
||||
entry = new CacheEntry(CacheEntryType.SET, key, null, set, null);
|
||||
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
|
||||
CopyOnWriteArraySet set = new CopyOnWriteArraySet();
|
||||
entry = new CacheEntry(cacheType, key, null, set, null);
|
||||
CacheEntry old = container.putIfAbsent(key, entry);
|
||||
if (old != null) set = old.setValue;
|
||||
if (old != null) set = old.csetValue;
|
||||
if (set != null) set.add(value);
|
||||
} else {
|
||||
entry.setValue.add(value);
|
||||
entry.csetValue.add(value);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendSetItem(String key, V value) {
|
||||
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendStringSetItem(String key, String value) {
|
||||
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendLongSetItem(String key, long value) {
|
||||
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> appendSetItemAsync(final String key, final V value) {
|
||||
return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value) {
|
||||
return CompletableFuture.runAsync(() -> appendStringSetItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> appendLongSetItemAsync(final String key, final long value) {
|
||||
return CompletableFuture.runAsync(() -> appendLongSetItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeSetItem(String key, V value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.setValue == null) return;
|
||||
entry.setValue.remove(value);
|
||||
if (entry == null || entry.csetValue == null) return;
|
||||
entry.csetValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeStringSetItem(String key, String value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.csetValue == null) return;
|
||||
entry.csetValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeLongSetItem(String key, long value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.csetValue == null) return;
|
||||
entry.csetValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> removeSetItemAsync(final String key, final V value) {
|
||||
return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> removeStringSetItemAsync(final String key, final String value) {
|
||||
return CompletableFuture.runAsync(() -> removeStringSetItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> removeLongSetItemAsync(final String key, final long value) {
|
||||
return CompletableFuture.runAsync(() -> removeLongSetItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> queryKeys() {
|
||||
return new ArrayList<>(container.keySet());
|
||||
|
||||
@@ -76,6 +76,46 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public List<CacheEntry<Object>> queryList();
|
||||
|
||||
public String getString(final String key);
|
||||
|
||||
public String getStringAndRefresh(final String key, final int expireSeconds);
|
||||
|
||||
public void setString(final String key, final String value);
|
||||
|
||||
public void setString(final int expireSeconds, final String key, final String value);
|
||||
|
||||
public Collection<String> getStringCollection(final String key);
|
||||
|
||||
public Collection<String> getStringCollectionAndRefresh(final String key, final int expireSeconds);
|
||||
|
||||
public void appendStringListItem(final String key, final String value);
|
||||
|
||||
public void removeStringListItem(final String key, final String value);
|
||||
|
||||
public void appendStringSetItem(final String key, final String value);
|
||||
|
||||
public void removeStringSetItem(final String key, final String value);
|
||||
|
||||
public long getLong(final String key, long defValue);
|
||||
|
||||
public long getLongAndRefresh(final String key, final int expireSeconds, long defValue);
|
||||
|
||||
public void setLong(final String key, final long value);
|
||||
|
||||
public void setLong(final int expireSeconds, final String key, final long value);
|
||||
|
||||
public Collection<Long> getLongCollection(final String key);
|
||||
|
||||
public Collection<Long> getLongCollectionAndRefresh(final String key, final int expireSeconds);
|
||||
|
||||
public void appendLongListItem(final String key, final long value);
|
||||
|
||||
public void removeLongListItem(final String key, final long value);
|
||||
|
||||
public void appendLongSetItem(final String key, final long value);
|
||||
|
||||
public void removeLongSetItem(final String key, final long value);
|
||||
|
||||
//---------------------- CompletableFuture 异步版 ---------------------------------
|
||||
public CompletableFuture<Boolean> existsAsync(final String key);
|
||||
|
||||
@@ -121,20 +161,58 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public CompletableFuture<List<CacheEntry< Object>>> queryListAsync();
|
||||
|
||||
public CompletableFuture<String> getStringAsync(final String key);
|
||||
|
||||
public CompletableFuture<String> getStringAndRefreshAsync(final String key, final int expireSeconds);
|
||||
|
||||
public CompletableFuture<Void> setStringAsync(final String key, final String value);
|
||||
|
||||
public CompletableFuture<Void> setStringAsync(final int expireSeconds, final String key, final String value);
|
||||
|
||||
public CompletableFuture<Collection<String>> getStringCollectionAsync(final String key);
|
||||
|
||||
public CompletableFuture<Collection<String>> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds);
|
||||
|
||||
public CompletableFuture<Void> appendStringListItemAsync(final String key, final String value);
|
||||
|
||||
public CompletableFuture<Void> removeStringListItemAsync(final String key, final String value);
|
||||
|
||||
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value);
|
||||
|
||||
public CompletableFuture<Void> removeStringSetItemAsync(final String key, final String value);
|
||||
|
||||
public CompletableFuture<Long> getLongAsync(final String key, long defValue);
|
||||
|
||||
public CompletableFuture<Long> getLongAndRefreshAsync(final String key, final int expireSeconds, long defValue);
|
||||
|
||||
public CompletableFuture<Void> setLongAsync(final String key, long value);
|
||||
|
||||
public CompletableFuture<Void> setLongAsync(final int expireSeconds, final String key, final long value);
|
||||
|
||||
public CompletableFuture<Collection<Long>> getLongCollectionAsync(final String key);
|
||||
|
||||
public CompletableFuture<Collection<Long>> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds);
|
||||
|
||||
public CompletableFuture<Void> appendLongListItemAsync(final String key, final long value);
|
||||
|
||||
public CompletableFuture<Void> removeLongListItemAsync(final String key, final long value);
|
||||
|
||||
public CompletableFuture<Void> appendLongSetItemAsync(final String key, final long value);
|
||||
|
||||
public CompletableFuture<Void> removeLongSetItemAsync(final String key, final long value);
|
||||
|
||||
default CompletableFuture<Boolean> isOpenAsync() {
|
||||
return CompletableFuture.completedFuture(isOpen());
|
||||
}
|
||||
|
||||
public static enum CacheEntryType {
|
||||
OBJECT, SET, LIST;
|
||||
LONG, STRING, OBJECT, ATOMIC,
|
||||
LONG_SET, STRING_SET, OBJECT_SET,
|
||||
LONG_LIST, STRING_LIST, OBJECT_LIST;
|
||||
}
|
||||
|
||||
public static final class CacheEntry<T> {
|
||||
|
||||
static final String JSON_SET_KEY = "{\"cacheType\":\"" + CacheEntryType.SET + "\"";
|
||||
|
||||
static final String JSON_LIST_KEY = "{\"cacheType\":\"" + CacheEntryType.LIST + "\"";
|
||||
|
||||
final CacheEntryType cacheType;
|
||||
|
||||
final String key;
|
||||
@@ -146,26 +224,26 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
T objectValue;
|
||||
|
||||
CopyOnWriteArraySet<T> setValue;
|
||||
CopyOnWriteArraySet<T> csetValue;
|
||||
|
||||
ConcurrentLinkedQueue<T> listValue;
|
||||
|
||||
public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet<T> setValue, ConcurrentLinkedQueue<T> listValue) {
|
||||
this(cacheType, 0, key, objectValue, setValue, listValue);
|
||||
public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue) {
|
||||
this(cacheType, 0, key, objectValue, csetValue, listValue);
|
||||
}
|
||||
|
||||
public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet<T> setValue, ConcurrentLinkedQueue<T> listValue) {
|
||||
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, setValue, listValue);
|
||||
public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue) {
|
||||
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue);
|
||||
}
|
||||
|
||||
@ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "setValue", "listValue"})
|
||||
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet<T> setValue, ConcurrentLinkedQueue<T> listValue) {
|
||||
@ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue"})
|
||||
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue) {
|
||||
this.cacheType = cacheType;
|
||||
this.expireSeconds = expireSeconds;
|
||||
this.lastAccessed = lastAccessed;
|
||||
this.key = key;
|
||||
this.objectValue = objectValue;
|
||||
this.setValue = setValue;
|
||||
this.csetValue = csetValue;
|
||||
this.listValue = listValue;
|
||||
}
|
||||
|
||||
@@ -176,12 +254,12 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
@ConvertColumn(ignore = true)
|
||||
public boolean isListCacheType() {
|
||||
return cacheType == CacheEntryType.LIST;
|
||||
return cacheType == CacheEntryType.LONG_LIST || cacheType == CacheEntryType.STRING_LIST || cacheType == CacheEntryType.OBJECT_LIST;
|
||||
}
|
||||
|
||||
@ConvertColumn(ignore = true)
|
||||
public boolean isSetCacheType() {
|
||||
return cacheType == CacheEntryType.SET;
|
||||
return cacheType == CacheEntryType.LONG_SET || cacheType == CacheEntryType.STRING_SET || cacheType == CacheEntryType.OBJECT_SET;
|
||||
}
|
||||
|
||||
@ConvertColumn(ignore = true)
|
||||
@@ -209,8 +287,8 @@ public interface CacheSource<V extends Object> {
|
||||
return objectValue;
|
||||
}
|
||||
|
||||
public CopyOnWriteArraySet<T> getSetValue() {
|
||||
return setValue;
|
||||
public CopyOnWriteArraySet<T> getCsetValue() {
|
||||
return csetValue;
|
||||
}
|
||||
|
||||
public ConcurrentLinkedQueue<T> getListValue() {
|
||||
|
||||
Reference in New Issue
Block a user