CacheSource增加hxxx方法

This commit is contained in:
Redkale
2020-07-17 10:41:41 +08:00
parent da141cc6bd
commit 627c1e953e
2 changed files with 155 additions and 27 deletions

View File

@@ -32,7 +32,7 @@ import org.redkale.util.*;
@AutoLoad(false)
@SuppressWarnings("unchecked")
@ResourceType(CacheSource.class)
public class CacheMemorySource<V extends Object> extends AbstractService implements CacheSource<V>, Service, AutoCloseable, Resourcable {
public final class CacheMemorySource<V extends Object> extends AbstractService implements CacheSource<V>, Service, AutoCloseable, Resourcable {
private static final Type STRING_ENTRY_TYPE = new TypeToken<CacheEntry<String>>() {
}.getType();
@@ -43,6 +43,9 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
private static final Type ATOMIC_ENTRY_TYPE = new TypeToken<CacheEntry<AtomicLong>>() {
}.getType();
private static final Type MAP_ENTRY_TYPE = new TypeToken<CacheEntry<ConcurrentHashMap>>() {
}.getType();
@Resource(name = "APP_HOME")
private File home;
@@ -152,7 +155,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
boolean datasync = false; //是否从远程同步过数据
//----------同步数据……-----------
// TODO
if (this.needStore) {
if (this.needStore && false) { //不存储
try {
File store = home == null ? new File("cache/" + resourceName()) : new File(home, "cache/" + resourceName());
if (!store.isFile() || !store.canRead()) return;
@@ -170,6 +173,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
convertType = STRING_ENTRY_TYPE;
} else if (line.startsWith("{\"cacheType\":\"" + CacheEntryType.ATOMIC)) {
convertType = ATOMIC_ENTRY_TYPE;
} else {
continue;
}
CacheEntry<Object> entry = convert.convertFrom(convertType, line);
if (entry.isExpired()) continue;
@@ -254,6 +259,13 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
System.out.println("all keys: " + source.queryKeys());
System.out.println("newnum 值 : " + source.incr("newnum"));
System.out.println("newnum 值 : " + source.decr("newnum"));
source.remove("hmap1");
source.hincr("map", "key1");
System.out.println("map.key1 值 : " + source.hgetLong("map", "key1", -1));
source.hmset("map", "key2", "haha", "key3", 333);
System.out.println("map.[key1,key2,key3] 值 : " + source.hmget("map", "key1", "key2", "key3"));
System.out.println("------------------------------------");
source.destroy(null);
source.init(null);
@@ -276,6 +288,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
public void destroy(AnyValue conf) {
if (scheduler != null) scheduler.shutdownNow();
if (!this.needStore || Sncp.isRemote(this) || container.isEmpty()) return;
if (true) return; //不存储
try {
File store = new File(home, "cache/" + resourceName());
store.getParentFile().mkdirs();
@@ -322,73 +335,145 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
//----------- hxxx --------------
@Override
public int hremove(final String key, String... fields) {
int count = 0;
CacheEntry entry = container.get(key);
if (entry == null || entry.mapValue == null) return 0;
for (String field : fields) {
if (entry.mapValue.remove(field) != null) count++;
}
return count;
}
@Override
public List<String> hkeys(final String key) {
List<String> list = new ArrayList<>();
CacheEntry entry = container.get(key);
if (entry == null || entry.mapValue == null) return list;
list.addAll(entry.mapValue.keySet());
return list;
}
@Override
public long hincr(final String key, String field) {
return 0;
return hincr(key, field, 1);
}
@Override
public long hincr(final String key, String field, long num) {
return 0;
CacheEntry entry = container.get(key);
if (entry == null) {
synchronized (container) {
entry = container.get(key);
if (entry == null) {
ConcurrentHashMap<String, Serializable> map = new ConcurrentHashMap();
map.put(field, new AtomicLong());
entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map);
container.put(key, entry);
}
}
}
Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong());
if (!(val instanceof AtomicLong)) {
synchronized (entry.mapValue) {
if (!(val instanceof AtomicLong)) {
if (val == null) {
val = new AtomicLong();
} else {
val = new AtomicLong(((Number) val).longValue());
}
entry.mapValue.put(field, val);
}
}
}
return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num);
}
@Override
public long hdecr(final String key, String field) {
return 0;
return hincr(key, field, -1);
}
@Override
public long hdecr(final String key, String field, long num) {
return 0;
return hincr(key, field, -num);
}
@Override
public boolean hexists(final String key, String field) {
return false;
if (key == null) return false;
CacheEntry entry = container.get(key);
if (entry == null || entry.isExpired() || entry.mapValue == null) return false;
return entry.mapValue.contains(field);
}
@Override
public <T> void hset(final String key, final String field, final Convert convert, final T value) {
hset(CacheEntryType.MAP, key, field, value);
}
@Override
public <T> void hset(final String key, final String field, final Type type, final T value) {
hset(CacheEntryType.MAP, key, field, value);
}
@Override
public <T> void hset(final String key, final String field, final Convert convert, final Type type, final T value) {
hset(CacheEntryType.MAP, key, field, value);
}
@Override
public void hsetString(final String key, final String field, final String value) {
hset(CacheEntryType.MAP, key, field, value);
}
@Override
public void hsetLong(final String key, final String field, final long value) {
hset(CacheEntryType.MAP, key, field, value);
}
@Override
public void hmset(final String key, final Serializable... values) {
for (int i = 0; i < values.length; i += 2) {
hset(CacheEntryType.MAP, key, (String) values[i], values[i + 1]);
}
}
@Override
public Serializable[] hmget(final String key, final String... fields) {
return null;
public List<Serializable> hmget(final String key, final String... fields) {
if (key == null) return null;
CacheEntry entry = container.get(key);
if (entry == null || entry.isExpired() || entry.mapValue == null) return null;
List<Serializable> rs = new ArrayList<>(fields.length);
for (int i = 0; i < fields.length; i++) {
rs.add((Serializable) entry.mapValue.get(fields[i]));
}
return rs;
}
@Override
public <T> T hget(final String key, final String field, final Type type) {
return null;
if (key == null) return null;
CacheEntry entry = container.get(key);
if (entry == null || entry.isExpired() || entry.mapValue == null) return null;
return (T) entry.mapValue.get(field);
}
@Override
public String hgetString(final String key, final String field) {
return null;
if (key == null) return null;
CacheEntry entry = container.get(key);
if (entry == null || entry.isExpired() || entry.mapValue == null) return null;
return (String) entry.mapValue.get(field);
}
@Override
public long hgetLong(final String key, final String field, long defValue) {
return 0L;
if (key == null) return defValue;
CacheEntry entry = container.get(key);
if (entry == null || entry.isExpired() || entry.mapValue == null) return defValue;
return ((Number) entry.mapValue.getOrDefault(field, defValue)).longValue();
}
//----------- hxxx --------------
@@ -438,6 +523,16 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
//----------- hxxx --------------
@Override
public CompletableFuture<Integer> hremoveAsync(final String key, String... fields) {
return CompletableFuture.supplyAsync(() -> hremove(key, fields), getExecutor());
}
@Override
public CompletableFuture<List<String>> hkeysAsync(final String key) {
return CompletableFuture.supplyAsync(() -> hkeys(key), getExecutor());
}
@Override
public CompletableFuture<Long> hincrAsync(final String key, String field) {
return CompletableFuture.supplyAsync(() -> hincr(key, field), getExecutor());
@@ -494,7 +589,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Serializable[]> hmgetAsync(final String key, final String... fields) {
public CompletableFuture<List<Serializable>> hmgetAsync(final String key, final String... fields) {
return CompletableFuture.supplyAsync(() -> hmget(key, fields), getExecutor());
}
@@ -612,7 +707,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(cacheType, key, value, null, null);
entry = new CacheEntry(cacheType, key, value, null, null, null);
container.putIfAbsent(key, entry);
} else {
entry.expireSeconds = 0;
@@ -621,6 +716,20 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
}
protected void hset(CacheEntryType cacheType, String key, String field, Object value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(CacheEntryType.MAP, key, value, null, null, new ConcurrentHashMap<>());
container.putIfAbsent(key, entry);
entry.mapValue.put(field, value);
} else {
entry.expireSeconds = 0;
entry.mapValue.put(field, value);
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
}
}
@Override
public void set(String key, V value) {
set(CacheEntryType.OBJECT, key, value);
@@ -685,7 +794,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null);
entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null, null);
container.putIfAbsent(key, entry);
} else {
if (expireSeconds > 0) entry.expireSeconds = expireSeconds;
@@ -790,7 +899,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
synchronized (container) {
entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null);
entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null, null);
container.put(key, entry);
}
}
@@ -1089,7 +1198,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
CacheEntry entry = container.get(key);
if (entry == null || !entry.isListCacheType() || entry.listValue == null) {
ConcurrentLinkedQueue list = new ConcurrentLinkedQueue();
entry = new CacheEntry(cacheType, key, null, null, list);
entry = new CacheEntry(cacheType, key, null, null, list, null);
CacheEntry old = container.putIfAbsent(key, entry);
if (old != null) list = old.listValue;
if (list != null) list.add(value);
@@ -1195,7 +1304,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
CacheEntry entry = container.get(key);
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
CopyOnWriteArraySet set = new CopyOnWriteArraySet();
entry = new CacheEntry(cacheType, key, null, set, null);
entry = new CacheEntry(cacheType, key, null, set, null, null);
CacheEntry old = container.putIfAbsent(key, entry);
if (old != null) set = old.csetValue;
if (set != null) set.add(value);

View File

@@ -71,6 +71,10 @@ public interface CacheSource<V extends Object> {
}
//----------- hxxx --------------
public int hremove(final String key, String... fields);
public List<String> hkeys(final String key);
public long hincr(final String key, String field);
public long hincr(final String key, String field, long num);
@@ -93,7 +97,7 @@ public interface CacheSource<V extends Object> {
public void hmset(final String key, final Serializable... values);
public Serializable[] hmget(final String key, final String... fields);
public List<Serializable> hmget(final String key, final String... fields);
public <T> T hget(final String key, final String field, final Type type);
@@ -300,6 +304,10 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Long> decrAsync(final String key, long num);
//----------- hxxx --------------
public CompletableFuture<Integer> hremoveAsync(final String key, String... fields);
public CompletableFuture<List<String>> hkeysAsync(final String key);
public CompletableFuture<Long> hincrAsync(final String key, String field);
public CompletableFuture<Long> hincrAsync(final String key, String field, long num);
@@ -322,7 +330,7 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Void> hmsetAsync(final String key, final Serializable... values);
public CompletableFuture<Serializable[]> hmgetAsync(final String key, final String... fields);
public CompletableFuture<List<Serializable>> hmgetAsync(final String key, final String... fields);
public <T> CompletableFuture<T> hgetAsync(final String key, final String field, final Type type);
@@ -436,7 +444,7 @@ public interface CacheSource<V extends Object> {
}
public static enum CacheEntryType {
LONG, STRING, OBJECT, ATOMIC,
LONG, STRING, OBJECT, ATOMIC, MAP,
LONG_SET, STRING_SET, OBJECT_SET,
LONG_LIST, STRING_LIST, OBJECT_LIST;
}
@@ -454,20 +462,22 @@ public interface CacheSource<V extends Object> {
T objectValue;
ConcurrentHashMap<String, Serializable> mapValue;
CopyOnWriteArraySet<T> csetValue;
ConcurrentLinkedQueue<T> listValue;
public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue) {
this(cacheType, 0, key, objectValue, csetValue, listValue);
public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
this(cacheType, 0, key, objectValue, csetValue, listValue, mapValue);
}
public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue) {
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue);
public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue, mapValue);
}
@ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue"})
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue) {
@ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue", "mapValue"})
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedQueue<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
this.cacheType = cacheType;
this.expireSeconds = expireSeconds;
this.lastAccessed = lastAccessed;
@@ -475,6 +485,7 @@ public interface CacheSource<V extends Object> {
this.objectValue = objectValue;
this.csetValue = csetValue;
this.listValue = listValue;
this.mapValue = mapValue;
}
@Override
@@ -492,6 +503,11 @@ public interface CacheSource<V extends Object> {
return cacheType == CacheEntryType.LONG_SET || cacheType == CacheEntryType.STRING_SET || cacheType == CacheEntryType.OBJECT_SET;
}
@ConvertColumn(ignore = true)
public boolean isMapCacheType() {
return cacheType == CacheEntryType.MAP;
}
@ConvertColumn(ignore = true)
public boolean isExpired() {
return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000));
@@ -525,5 +541,8 @@ public interface CacheSource<V extends Object> {
return listValue;
}
public ConcurrentHashMap<String, Serializable> getMapValue() {
return mapValue;
}
}
}