From 627c1e953eaa464eb4012ba88ba7e209df21e21a Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 17 Jul 2020 10:41:41 +0800 Subject: [PATCH] =?UTF-8?q?CacheSource=E5=A2=9E=E5=8A=A0hxxx=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/source/CacheMemorySource.java | 145 +++++++++++++++--- src/org/redkale/source/CacheSource.java | 37 +++-- 2 files changed, 155 insertions(+), 27 deletions(-) diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index c22041434..9104515f3 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -32,7 +32,7 @@ import org.redkale.util.*; @AutoLoad(false) @SuppressWarnings("unchecked") @ResourceType(CacheSource.class) -public class CacheMemorySource extends AbstractService implements CacheSource, Service, AutoCloseable, Resourcable { +public final class CacheMemorySource extends AbstractService implements CacheSource, Service, AutoCloseable, Resourcable { private static final Type STRING_ENTRY_TYPE = new TypeToken>() { }.getType(); @@ -43,6 +43,9 @@ public class CacheMemorySource extends AbstractService impleme private static final Type ATOMIC_ENTRY_TYPE = new TypeToken>() { }.getType(); + private static final Type MAP_ENTRY_TYPE = new TypeToken>() { + }.getType(); + @Resource(name = "APP_HOME") private File home; @@ -152,7 +155,7 @@ public class CacheMemorySource extends AbstractService impleme boolean datasync = false; //是否从远程同步过数据 //----------同步数据……----------- // TODO - if (this.needStore) { + if (this.needStore && false) { //不存储 try { File store = home == null ? new File("cache/" + resourceName()) : new File(home, "cache/" + resourceName()); if (!store.isFile() || !store.canRead()) return; @@ -170,6 +173,8 @@ public class CacheMemorySource extends AbstractService impleme convertType = STRING_ENTRY_TYPE; } else if (line.startsWith("{\"cacheType\":\"" + CacheEntryType.ATOMIC)) { convertType = ATOMIC_ENTRY_TYPE; + } else { + continue; } CacheEntry entry = convert.convertFrom(convertType, line); if (entry.isExpired()) continue; @@ -254,6 +259,13 @@ public class CacheMemorySource extends AbstractService impleme System.out.println("all keys: " + source.queryKeys()); System.out.println("newnum 值 : " + source.incr("newnum")); System.out.println("newnum 值 : " + source.decr("newnum")); + + source.remove("hmap1"); + source.hincr("map", "key1"); + System.out.println("map.key1 值 : " + source.hgetLong("map", "key1", -1)); + source.hmset("map", "key2", "haha", "key3", 333); + System.out.println("map.[key1,key2,key3] 值 : " + source.hmget("map", "key1", "key2", "key3")); + System.out.println("------------------------------------"); source.destroy(null); source.init(null); @@ -276,6 +288,7 @@ public class CacheMemorySource extends AbstractService impleme public void destroy(AnyValue conf) { if (scheduler != null) scheduler.shutdownNow(); if (!this.needStore || Sncp.isRemote(this) || container.isEmpty()) return; + if (true) return; //不存储 try { File store = new File(home, "cache/" + resourceName()); store.getParentFile().mkdirs(); @@ -322,73 +335,145 @@ public class CacheMemorySource extends AbstractService impleme } //----------- hxxx -------------- + @Override + public int hremove(final String key, String... fields) { + int count = 0; + CacheEntry entry = container.get(key); + if (entry == null || entry.mapValue == null) return 0; + for (String field : fields) { + if (entry.mapValue.remove(field) != null) count++; + } + return count; + } + + @Override + public List hkeys(final String key) { + List list = new ArrayList<>(); + CacheEntry entry = container.get(key); + if (entry == null || entry.mapValue == null) return list; + list.addAll(entry.mapValue.keySet()); + return list; + } + @Override public long hincr(final String key, String field) { - return 0; + return hincr(key, field, 1); } @Override public long hincr(final String key, String field, long num) { - return 0; + CacheEntry entry = container.get(key); + if (entry == null) { + synchronized (container) { + entry = container.get(key); + if (entry == null) { + ConcurrentHashMap map = new ConcurrentHashMap(); + map.put(field, new AtomicLong()); + entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); + container.put(key, entry); + } + } + } + Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); + if (!(val instanceof AtomicLong)) { + synchronized (entry.mapValue) { + if (!(val instanceof AtomicLong)) { + if (val == null) { + val = new AtomicLong(); + } else { + val = new AtomicLong(((Number) val).longValue()); + } + entry.mapValue.put(field, val); + } + } + } + return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num); } @Override public long hdecr(final String key, String field) { - return 0; + return hincr(key, field, -1); } @Override public long hdecr(final String key, String field, long num) { - return 0; + return hincr(key, field, -num); } @Override public boolean hexists(final String key, String field) { - return false; + if (key == null) return false; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) return false; + return entry.mapValue.contains(field); } @Override public void hset(final String key, final String field, final Convert convert, final T value) { + hset(CacheEntryType.MAP, key, field, value); } @Override public void hset(final String key, final String field, final Type type, final T value) { + hset(CacheEntryType.MAP, key, field, value); } @Override public void hset(final String key, final String field, final Convert convert, final Type type, final T value) { + hset(CacheEntryType.MAP, key, field, value); } @Override public void hsetString(final String key, final String field, final String value) { + hset(CacheEntryType.MAP, key, field, value); } @Override public void hsetLong(final String key, final String field, final long value) { + hset(CacheEntryType.MAP, key, field, value); } @Override public void hmset(final String key, final Serializable... values) { + for (int i = 0; i < values.length; i += 2) { + hset(CacheEntryType.MAP, key, (String) values[i], values[i + 1]); + } } @Override - public Serializable[] hmget(final String key, final String... fields) { - return null; + public List hmget(final String key, final String... fields) { + if (key == null) return null; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) return null; + List rs = new ArrayList<>(fields.length); + for (int i = 0; i < fields.length; i++) { + rs.add((Serializable) entry.mapValue.get(fields[i])); + } + return rs; } @Override public T hget(final String key, final String field, final Type type) { - return null; + if (key == null) return null; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) return null; + return (T) entry.mapValue.get(field); } @Override public String hgetString(final String key, final String field) { - return null; + if (key == null) return null; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) return null; + return (String) entry.mapValue.get(field); } @Override public long hgetLong(final String key, final String field, long defValue) { - return 0L; + if (key == null) return defValue; + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) return defValue; + return ((Number) entry.mapValue.getOrDefault(field, defValue)).longValue(); } //----------- hxxx -------------- @@ -438,6 +523,16 @@ public class CacheMemorySource extends AbstractService impleme } //----------- hxxx -------------- + @Override + public CompletableFuture hremoveAsync(final String key, String... fields) { + return CompletableFuture.supplyAsync(() -> hremove(key, fields), getExecutor()); + } + + @Override + public CompletableFuture> hkeysAsync(final String key) { + return CompletableFuture.supplyAsync(() -> hkeys(key), getExecutor()); + } + @Override public CompletableFuture hincrAsync(final String key, String field) { return CompletableFuture.supplyAsync(() -> hincr(key, field), getExecutor()); @@ -494,7 +589,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - public CompletableFuture hmgetAsync(final String key, final String... fields) { + public CompletableFuture> hmgetAsync(final String key, final String... fields) { return CompletableFuture.supplyAsync(() -> hmget(key, fields), getExecutor()); } @@ -612,7 +707,7 @@ public class CacheMemorySource extends AbstractService impleme if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) { - entry = new CacheEntry(cacheType, key, value, null, null); + entry = new CacheEntry(cacheType, key, value, null, null, null); container.putIfAbsent(key, entry); } else { entry.expireSeconds = 0; @@ -621,6 +716,20 @@ public class CacheMemorySource extends AbstractService impleme } } + protected void hset(CacheEntryType cacheType, String key, String field, Object value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.MAP, key, value, null, null, new ConcurrentHashMap<>()); + container.putIfAbsent(key, entry); + entry.mapValue.put(field, value); + } else { + entry.expireSeconds = 0; + entry.mapValue.put(field, value); + entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + } + } + @Override public void set(String key, V value) { set(CacheEntryType.OBJECT, key, value); @@ -685,7 +794,7 @@ public class CacheMemorySource extends AbstractService impleme if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) { - entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null); + entry = new CacheEntry(cacheType, expireSeconds, key, value, null, null, null); container.putIfAbsent(key, entry); } else { if (expireSeconds > 0) entry.expireSeconds = expireSeconds; @@ -790,7 +899,7 @@ public class CacheMemorySource extends AbstractService impleme synchronized (container) { entry = container.get(key); if (entry == null) { - entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null); + entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null, null); container.put(key, entry); } } @@ -1089,7 +1198,7 @@ public class CacheMemorySource extends AbstractService impleme CacheEntry entry = container.get(key); if (entry == null || !entry.isListCacheType() || entry.listValue == null) { ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); - entry = new CacheEntry(cacheType, key, null, null, list); + entry = new CacheEntry(cacheType, key, null, null, list, null); CacheEntry old = container.putIfAbsent(key, entry); if (old != null) list = old.listValue; if (list != null) list.add(value); @@ -1195,7 +1304,7 @@ public class CacheMemorySource extends AbstractService impleme CacheEntry entry = container.get(key); if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { CopyOnWriteArraySet set = new CopyOnWriteArraySet(); - entry = new CacheEntry(cacheType, key, null, set, null); + entry = new CacheEntry(cacheType, key, null, set, null, null); CacheEntry old = container.putIfAbsent(key, entry); if (old != null) set = old.csetValue; if (set != null) set.add(value); diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index 5ccf97bbe..d14aa5d44 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -71,6 +71,10 @@ public interface CacheSource { } //----------- hxxx -------------- + public int hremove(final String key, String... fields); + + public List hkeys(final String key); + public long hincr(final String key, String field); public long hincr(final String key, String field, long num); @@ -93,7 +97,7 @@ public interface CacheSource { public void hmset(final String key, final Serializable... values); - public Serializable[] hmget(final String key, final String... fields); + public List hmget(final String key, final String... fields); public T hget(final String key, final String field, final Type type); @@ -300,6 +304,10 @@ public interface CacheSource { public CompletableFuture decrAsync(final String key, long num); //----------- hxxx -------------- + public CompletableFuture hremoveAsync(final String key, String... fields); + + public CompletableFuture> hkeysAsync(final String key); + public CompletableFuture hincrAsync(final String key, String field); public CompletableFuture hincrAsync(final String key, String field, long num); @@ -322,7 +330,7 @@ public interface CacheSource { public CompletableFuture hmsetAsync(final String key, final Serializable... values); - public CompletableFuture hmgetAsync(final String key, final String... fields); + public CompletableFuture> hmgetAsync(final String key, final String... fields); public CompletableFuture hgetAsync(final String key, final String field, final Type type); @@ -436,7 +444,7 @@ public interface CacheSource { } public static enum CacheEntryType { - LONG, STRING, OBJECT, ATOMIC, + LONG, STRING, OBJECT, ATOMIC, MAP, LONG_SET, STRING_SET, OBJECT_SET, LONG_LIST, STRING_LIST, OBJECT_LIST; } @@ -454,20 +462,22 @@ public interface CacheSource { T objectValue; + ConcurrentHashMap mapValue; + CopyOnWriteArraySet csetValue; ConcurrentLinkedQueue listValue; - public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue) { - this(cacheType, 0, key, objectValue, csetValue, listValue); + public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue, ConcurrentHashMap mapValue) { + this(cacheType, 0, key, objectValue, csetValue, listValue, mapValue); } - public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue) { - this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue); + public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue, ConcurrentHashMap mapValue) { + this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue, mapValue); } - @ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue"}) - public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue) { + @ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue", "mapValue"}) + public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet csetValue, ConcurrentLinkedQueue listValue, ConcurrentHashMap mapValue) { this.cacheType = cacheType; this.expireSeconds = expireSeconds; this.lastAccessed = lastAccessed; @@ -475,6 +485,7 @@ public interface CacheSource { this.objectValue = objectValue; this.csetValue = csetValue; this.listValue = listValue; + this.mapValue = mapValue; } @Override @@ -492,6 +503,11 @@ public interface CacheSource { return cacheType == CacheEntryType.LONG_SET || cacheType == CacheEntryType.STRING_SET || cacheType == CacheEntryType.OBJECT_SET; } + @ConvertColumn(ignore = true) + public boolean isMapCacheType() { + return cacheType == CacheEntryType.MAP; + } + @ConvertColumn(ignore = true) public boolean isExpired() { return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000)); @@ -525,5 +541,8 @@ public interface CacheSource { return listValue; } + public ConcurrentHashMap getMapValue() { + return mapValue; + } } }