From 56924fb447ba321f2bd504fca98835747e16e8eb Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 27 Jul 2023 22:01:42 +0800 Subject: [PATCH] =?UTF-8?q?Reproduce=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redkale/source/AbstractCacheSource.java | 17 + .../org/redkale/source/CacheMemorySource.java | 362 +++++++++--------- src/main/java/org/redkale/util/Creator.java | 4 + src/main/java/org/redkale/util/Reproduce.java | 85 +++- src/main/java/org/redkale/util/Utility.java | 33 ++ 5 files changed, 312 insertions(+), 189 deletions(-) diff --git a/src/main/java/org/redkale/source/AbstractCacheSource.java b/src/main/java/org/redkale/source/AbstractCacheSource.java index d6e83a36e..955651e3a 100644 --- a/src/main/java/org/redkale/source/AbstractCacheSource.java +++ b/src/main/java/org/redkale/source/AbstractCacheSource.java @@ -111,6 +111,23 @@ public abstract class AbstractCacheSource extends AbstractService implements Cac return source; } + protected CompletableFuture supplyFuture(Supplier supplier) { + try { + return CompletableFuture.completedFuture(supplier.get()); + } catch (Throwable t) { + return CompletableFuture.failedFuture(t); + } + } + + protected CompletableFuture runFuture(Runnable runner) { + try { + runner.run(); + return CompletableFuture.completedFuture(null); + } catch (Throwable t) { + return CompletableFuture.failedFuture(t); + } + } + protected CompletableFuture supplyAsync(Supplier supplier) { return CompletableFuture.supplyAsync(supplier); } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 085a5bf3e..fd89639f2 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -163,7 +163,7 @@ public final class CacheMemorySource extends AbstractCacheSource { //----------- hxxx -------------- @Override public CompletableFuture hdelAsync(final String key, String... fields) { - return supplyAsync(() -> { + return supplyFuture(() -> { long count = 0; CacheEntry entry = container.get(key); if (entry == null || entry.mapValue == null) { @@ -175,12 +175,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return count; - }, getExecutor()); + }); } @Override public CompletableFuture> hkeysAsync(final String key) { - return supplyAsync(() -> { + return supplyFuture(() -> { List list = new ArrayList<>(); CacheEntry entry = container.get(key); if (entry == null || entry.mapValue == null) { @@ -188,18 +188,18 @@ public final class CacheMemorySource extends AbstractCacheSource { } list.addAll(entry.mapValue.keySet()); return list; - }, getExecutor()); + }); } @Override public CompletableFuture hlenAsync(final String key) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.get(key); if (entry == null || entry.mapValue == null) { return 0L; } return (long) entry.mapValue.keySet().size(); - }, getExecutor()); + }); } @Override @@ -209,7 +209,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture hincrbyAsync(final String key, String field, long num) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.get(key); if (entry == null) { containerLock.lock(); @@ -242,12 +242,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num); - }, getExecutor()); + }); } @Override public CompletableFuture hincrbyFloatAsync(final String key, String field, double num) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.get(key); if (entry == null) { containerLock.lock(); @@ -280,7 +280,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num))); - }, getExecutor()); + }); } @Override @@ -295,7 +295,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture hexistsAsync(final String key, String field) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return false; } @@ -304,42 +304,42 @@ public final class CacheMemorySource extends AbstractCacheSource { return false; } return entry.mapValue.contains(field); - }, getExecutor()); + }); } @Override public CompletableFuture hsetAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return runAsync(() -> { + return runFuture(() -> { hset(CacheEntryType.MAP, key, field, value); - }, getExecutor()); + }); } @Override public CompletableFuture hsetnxAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return supplyAsync(() -> { + return supplyFuture(() -> { return hsetnx(CacheEntryType.MAP, key, field, value); - }, getExecutor()); + }); } @Override public CompletableFuture hmsetAsync(final String key, final Serializable... values) { - return runAsync(() -> { + return runFuture(() -> { for (int i = 0; i < values.length; i += 2) { hset(CacheEntryType.MAP, key, (String) values[i], values[i + 1]); } - }, getExecutor()); + }); } @Override public CompletableFuture hmsetAsync(final String key, final Map map) { - return runAsync(() -> { + return runFuture(() -> { map.forEach((k, v) -> hset(CacheEntryType.MAP, key, (String) k, v)); - }, getExecutor()); + }); } @Override public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -357,26 +357,26 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return rs; - }, getExecutor()); + }); } @Override public CompletableFuture> hgetallAsync(final String key, final Type type) { - return supplyAsync(() -> { + return supplyFuture(() -> { return hgetall(CacheEntryType.MAP, key, type); - }, getExecutor()); + }); } @Override public CompletableFuture> hvalsAsync(final String key, final Type type) { - return supplyAsync(() -> { + return supplyFuture(() -> { return hvals(CacheEntryType.MAP, key, type); - }, getExecutor()); + }); } @Override public CompletableFuture> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return new HashMap(); } @@ -391,12 +391,12 @@ public final class CacheMemorySource extends AbstractCacheSource { Set> set = entry.mapValue.entrySet(); return set.stream().filter(en -> regx.test(en.getKey())).collect(Collectors.toMap(en -> en.getKey(), en -> en.getValue())); } - }, getExecutor()); + }); } @Override public CompletableFuture hgetAsync(final String key, final String field, final Type type) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -412,12 +412,12 @@ public final class CacheMemorySource extends AbstractCacheSource { return (T) (obj instanceof Long ? obj : Long.parseLong(obj.toString())); } return (T) obj; - }, getExecutor()); + }); } @Override public CompletableFuture hstrlenAsync(final String key, final String field) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null || field == null) { return 0L; } @@ -430,13 +430,13 @@ public final class CacheMemorySource extends AbstractCacheSource { return 0L; } return (long) obj.toString().length(); - }, getExecutor()); + }); } //----------- hxxx -------------- @Override public CompletableFuture existsAsync(String key) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return false; } @@ -445,12 +445,12 @@ public final class CacheMemorySource extends AbstractCacheSource { return false; } return !entry.isExpired(); - }, getExecutor()); + }); } @Override public CompletableFuture getAsync(final String key, final Type type) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -472,13 +472,13 @@ public final class CacheMemorySource extends AbstractCacheSource { return (T) JsonConvert.root().convertFrom(type, JsonConvert.root().convertToBytes(obj)); } return (T) obj; - }, getExecutor()); + }); } //----------- hxxx -------------- @Override public CompletableFuture getexAsync(final String key, final int expireSeconds, final Type type) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -495,7 +495,7 @@ public final class CacheMemorySource extends AbstractCacheSource { return (T) (entry.csetValue == null ? null : new HashSet(entry.csetValue)); } return (T) entry.objectValue; - }, getExecutor()); + }); } protected void set(CacheEntryType cacheType, String key, Object value) { @@ -598,7 +598,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture msetAsync(Serializable... keyVals) { - return runAsync(() -> { + return runFuture(() -> { if (keyVals.length % 2 != 0) { throw new SourceException("key value must be paired"); } @@ -613,12 +613,12 @@ public final class CacheMemorySource extends AbstractCacheSource { set(CacheEntryType.OBJECT, key, val); } } - }, getExecutor()); + }); } @Override public CompletableFuture msetAsync(Map map) { - return runAsync(() -> { + return runFuture(() -> { map.forEach((key, val) -> { if (val instanceof String) { set(CacheEntryType.STRING, (String) key, val); @@ -628,45 +628,45 @@ public final class CacheMemorySource extends AbstractCacheSource { set(CacheEntryType.OBJECT, (String) key, val); } }); - }, getExecutor()); + }); } @Override public CompletableFuture setAsync(String key, Convert convert, Type type, T value) { - return runAsync(() -> { + return runFuture(() -> { set(findEntryType(type), key, value); - }, getExecutor()); + }); } @Override public CompletableFuture setnxAsync(String key, Convert convert, Type type, T value) { - return supplyAsync(() -> { + return supplyFuture(() -> { return setnx(findEntryType(type), key, value); - }, getExecutor()); + }); } @Override public CompletableFuture setnxexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { - return supplyAsync(() -> { + return supplyFuture(() -> { return setnxex(findEntryType(type), expireSeconds, key, value); - }, getExecutor()); + }); } @Override public CompletableFuture getSetAsync(String key, Convert convert, Type type, T value) { - return supplyAsync(() -> { + return supplyFuture(() -> { T old = get(key, type); set(findEntryType(type), key, value); return old; - }, getExecutor()); + }); } @Override public CompletableFuture getDelAsync(String key, Type type) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.remove(key); return entry == null ? null : (T) entry.objectValue; - }, getExecutor()); + }); } protected void set(CacheEntryType cacheType, int expireSeconds, String key, Object value) { @@ -704,14 +704,14 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { - return runAsync(() -> { + return runFuture(() -> { set(findEntryType(type), expireSeconds, key, value); - }, getExecutor()); + }); } @Override public CompletableFuture expireAsync(String key, int expireSeconds) { - return runAsync(() -> { + return runFuture(() -> { if (key == null) { return; } @@ -720,12 +720,12 @@ public final class CacheMemorySource extends AbstractCacheSource { return; } entry.expireSeconds = expireSeconds; - }, getExecutor()); + }); } @Override public CompletableFuture persistAsync(final String key) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return false; } @@ -739,12 +739,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } else { return false; } - }, getExecutor()); + }); } @Override public CompletableFuture renameAsync(String oldKey, String newKey) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (oldKey == null || newKey == null) { return false; } @@ -756,12 +756,12 @@ public final class CacheMemorySource extends AbstractCacheSource { container.put(newKey, entry); container.remove(oldKey); return true; - }, getExecutor()); + }); } @Override public CompletableFuture renamenxAsync(String oldKey, String newKey) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (oldKey == null || newKey == null) { return false; } @@ -776,12 +776,12 @@ public final class CacheMemorySource extends AbstractCacheSource { container.put(newKey, entry); container.remove(oldKey); return true; - }, getExecutor()); + }); } @Override public CompletableFuture delAsync(final String... keys) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (keys == null) { return 0L; } @@ -790,7 +790,7 @@ public final class CacheMemorySource extends AbstractCacheSource { count += container.remove(key) == null ? 0 : 1; } return count; - }, getExecutor()); + }); } @Override @@ -800,7 +800,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture incrbyAsync(final String key, long num) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.get(key); if (entry == null) { containerLock.lock(); @@ -825,12 +825,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return ((AtomicLong) entry.objectValue).addAndGet(num); - }, getExecutor()); + }); } @Override public CompletableFuture incrbyFloatAsync(final String key, double num) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.get(key); if (entry == null) { containerLock.lock(); @@ -846,7 +846,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num)); return Double.longBitsToDouble(v.intValue()); - }, getExecutor()); + }); } @Override @@ -861,7 +861,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture> srandmemberAsync(String key, Type componentType, int count) { - return supplyAsync(() -> { + return supplyFuture(() -> { List list = new ArrayList<>(); CacheEntry entry = container.get(key); if (entry == null || entry.csetValue == null) { @@ -881,12 +881,12 @@ public final class CacheMemorySource extends AbstractCacheSource { return vals.subList(0, count); } return list; - }, getExecutor()); + }); } @Override public CompletableFuture smoveAsync(String key, String key2, Type componentType, T member) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.get(key); if (entry == null || entry.csetValue == null) { return false; @@ -901,12 +901,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return rs; - }, getExecutor()); + }); } @Override public CompletableFuture> sdiffAsync(final String key, final Type componentType, final String... key2s) { - return supplyAsync(() -> { + return supplyFuture(() -> { Set rs = new HashSet<>(); CacheEntry entry = container.get(key); if (entry == null || entry.csetValue == null) { @@ -920,12 +920,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return rs; - }, getExecutor()); + }); } @Override public CompletableFuture sdiffstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyAsync(() -> { + return supplyFuture(() -> { Set rs = sdiff(srcKey, Object.class, srcKey2s); if (container.containsKey(key)) { Set set = container.get(srcKey).csetValue; @@ -935,12 +935,12 @@ public final class CacheMemorySource extends AbstractCacheSource { appendSetItem(CacheEntryType.SET_OBJECT, key, rs); } return (long) rs.size(); - }, getExecutor()); + }); } @Override public CompletableFuture> sinterAsync(final String key, final Type componentType, final String... key2s) { - return supplyAsync(() -> { + return supplyFuture(() -> { Set rs = new HashSet<>(); CacheEntry entry = container.get(key); if (entry == null || entry.csetValue == null) { @@ -963,12 +963,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return rs; - }, getExecutor()); + }); } @Override public CompletableFuture sinterstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyAsync(() -> { + return supplyFuture(() -> { Set rs = sinter(srcKey, Object.class, srcKey2s); if (container.containsKey(key)) { Set set = container.get(srcKey).csetValue; @@ -978,12 +978,12 @@ public final class CacheMemorySource extends AbstractCacheSource { appendSetItem(CacheEntryType.SET_OBJECT, key, rs); } return (long) rs.size(); - }, getExecutor()); + }); } @Override public CompletableFuture> sunionAsync(final String key, final Type componentType, final String... key2s) { - return supplyAsync(() -> { + return supplyFuture(() -> { Set rs = new HashSet<>(); CacheEntry entry = container.get(key); if (entry == null || entry.csetValue == null) { @@ -997,12 +997,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return rs; - }, getExecutor()); + }); } @Override public CompletableFuture sunionstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyAsync(() -> { + return supplyFuture(() -> { Set rs = sunion(srcKey, Object.class, srcKey2s); if (container.containsKey(key)) { Set set = container.get(srcKey).csetValue; @@ -1012,7 +1012,7 @@ public final class CacheMemorySource extends AbstractCacheSource { appendSetItem(CacheEntryType.SET_OBJECT, key, rs); } return (long) rs.size(); - }, getExecutor()); + }); } @Override @@ -1027,7 +1027,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture>> smembersAsync(final Type componentType, final String... keys) { - return supplyAsync(() -> { + return supplyFuture(() -> { Map> map = new HashMap<>(); for (String key : keys) { Set s = (Set) get(key, componentType); @@ -1036,24 +1036,24 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return map; - }, getExecutor()); + }); } @Override public CompletableFuture> smismembersAsync(final String key, final String... members) { - return supplyAsync(() -> { + return supplyFuture(() -> { Set s = (Set) get(key, Object.class); List rs = new ArrayList<>(); for (String member : members) { rs.add(s != null && s.contains(member)); } return rs; - }, getExecutor()); + }); } @Override public CompletableFuture>> lrangesAsync(final Type componentType, final String... keys) { - return supplyAsync(() -> { + return supplyFuture(() -> { Map> map = new HashMap<>(); for (String key : keys) { List s = (List) get(key, componentType); @@ -1062,12 +1062,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } } return map; - }, getExecutor()); + }); } @Override public CompletableFuture> mgetAsync(final Type componentType, final String... keys) { - return supplyAsync(() -> { + return supplyFuture(() -> { List list = new ArrayList<>(); for (String key : keys) { Object v = get(key, componentType); @@ -1081,27 +1081,27 @@ public final class CacheMemorySource extends AbstractCacheSource { list.add((T) v); } return list; - }, getExecutor()); + }); } @Override public CompletableFuture llenAsync(final String key) { - return supplyAsync(() -> { + return supplyFuture(() -> { Collection collection = (Collection) get(key, Object.class); return collection == null ? 0L : collection.size(); - }, getExecutor()); + }); } @Override public CompletableFuture lindexAsync(String key, Type componentType, int index) { - return supplyAsync(() -> { + return supplyFuture(() -> { List list = (List) get(key, Object.class); if (list == null || list.isEmpty()) { return null; } int pos = index >= 0 ? index : list.size() + index; return pos >= list.size() ? null : list.get(pos); - }, getExecutor()); + }); } @Override @@ -1115,7 +1115,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } protected CompletableFuture linsertAsync(String key, Type componentType, boolean before, T pivot, T value) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.get(key); if (entry == null || !entry.isListCacheType() || entry.listValue == null) { return 0L; @@ -1153,30 +1153,30 @@ public final class CacheMemorySource extends AbstractCacheSource { } finally { entry.unlock(); } - }, getExecutor()); + }); } @Override public CompletableFuture saddAsync(final String key, final Type componentType, T... values) { - return runAsync(() -> { + return runFuture(() -> { appendSetItem(componentType == String.class ? CacheEntryType.SET_STRING : CacheEntryType.SET_OBJECT, key, List.of(values)); - }, getExecutor()); + }); } @Override public CompletableFuture scardAsync(final String key) { - return supplyAsync(() -> { + return supplyFuture(() -> { Collection collection = (Collection) get(key, Object.class); return collection == null ? 0L : collection.size(); - }, getExecutor()); + }); } @Override public CompletableFuture sismemberAsync(final String key, final Type type, final T value) { - return supplyAsync(() -> { + return supplyFuture(() -> { Collection list = get(key, type); return list != null && list.contains(value); - }, getExecutor()); + }); } protected void appendListItem(CacheEntryType cacheType, String key, Object... values) { @@ -1217,27 +1217,27 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture lpushAsync(final String key, final Type componentType, T... values) { - return runAsync(() -> { + return runFuture(() -> { for (T value : values) { appendListItem(CacheEntryType.LIST_OBJECT, false, key, value); } - }, getExecutor()); + }); } @Override public CompletableFuture lpushxAsync(final String key, final Type componentType, T... values) { - return runAsync(() -> { + return runFuture(() -> { if (container.containsKey(key)) { for (T value : values) { appendListItem(CacheEntryType.LIST_OBJECT, false, key, value); } } - }, getExecutor()); + }); } @Override public CompletableFuture lpopAsync(final String key, final Type componentType) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -1253,12 +1253,12 @@ public final class CacheMemorySource extends AbstractCacheSource { obj = ((Number) obj).longValue(); } return (T) obj; - }, getExecutor()); + }); } @Override public CompletableFuture ltrimAsync(final String key, int start, int stop) { - return runAsync(() -> { + return runFuture(() -> { if (key == null) { return; } @@ -1280,21 +1280,21 @@ public final class CacheMemorySource extends AbstractCacheSource { it.remove(); } } - }, getExecutor()); + }); } @Override public CompletableFuture rpoplpushAsync(final String key, final String key2, final Type componentType) { - return supplyAsync(() -> { + return supplyFuture(() -> { T val = rpop(key, componentType); lpush(key2, componentType, val); return val; - }, getExecutor()); + }); } @Override public CompletableFuture rpopAsync(final String key, final Type componentType) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -1310,30 +1310,30 @@ public final class CacheMemorySource extends AbstractCacheSource { obj = ((Number) obj).longValue(); } return (T) obj; - }, getExecutor()); + }); } @Override public CompletableFuture rpushxAsync(final String key, final Type componentType, final T... values) { - return runAsync(() -> { + return runFuture(() -> { if (container.containsKey(key)) { for (T value : values) { appendListItem(CacheEntryType.LIST_OBJECT, key, value); } } - }, getExecutor()); + }); } @Override public CompletableFuture rpushAsync(final String key, final Type componentType, final T... values) { - return runAsync(() -> { + return runFuture(() -> { appendListItem(CacheEntryType.LIST_OBJECT, key, values); - }, getExecutor()); + }); } @Override public CompletableFuture lremAsync(final String key, final Type componentType, T value) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return 0L; } @@ -1342,12 +1342,12 @@ public final class CacheMemorySource extends AbstractCacheSource { return 0L; } return entry.listValue.remove(value) ? 1L : 0L; - }, getExecutor()); + }); } @Override public CompletableFuture spopAsync(final String key, final Type componentType) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -1372,12 +1372,12 @@ public final class CacheMemorySource extends AbstractCacheSource { return (T) del; } return null; - }, getExecutor()); + }); } @Override public CompletableFuture> spopAsync(final String key, final int count, final Type componentType) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return new LinkedHashSet<>(); } @@ -1403,12 +1403,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } entry.csetValue.removeAll(list); return list; - }, getExecutor()); + }); } @Override public CompletableFuture> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return new LinkedHashSet(); } @@ -1429,7 +1429,7 @@ public final class CacheMemorySource extends AbstractCacheSource { list.add((T) obj); } return list; - }, getExecutor()); + }); } protected void appendSetItem(CacheEntryType cacheType, String key, Collection values) { @@ -1454,18 +1454,18 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture zaddAsync(String key, CacheScoredValue... values) { - return runAsync(() -> { + return runFuture(() -> { List list = new ArrayList<>(); for (CacheScoredValue v : values) { list.add(new CacheScoredValue.NumberScoredValue(v)); } appendSetItem(CacheEntryType.SET_SORTED, key, list); - }, getExecutor()); + }); } @Override public CompletableFuture zincrbyAsync(String key, CacheScoredValue value) { - return supplyAsync(() -> { + return supplyFuture(() -> { CacheEntry entry = container.get(key); if (entry == null || entry.isExpired() || entry.csetValue == null) { containerLock.lock(); @@ -1506,12 +1506,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } finally { entry.lock.unlock(); } - }, getExecutor()); + }); } @Override public CompletableFuture zcardAsync(String key) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return 0L; } @@ -1520,12 +1520,12 @@ public final class CacheMemorySource extends AbstractCacheSource { return 0L; } return (long) entry.csetValue.size(); - }, getExecutor()); + }); } @Override public CompletableFuture zrankAsync(String key, String member) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -1543,12 +1543,12 @@ public final class CacheMemorySource extends AbstractCacheSource { c++; } return null; - }, getExecutor()); + }); } @Override public CompletableFuture zrevrankAsync(String key, String member) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -1566,12 +1566,12 @@ public final class CacheMemorySource extends AbstractCacheSource { c++; } return null; - }, getExecutor()); + }); } @Override public CompletableFuture> zrangeAsync(String key, int start, int stop) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return new ArrayList<>(); } @@ -1589,12 +1589,12 @@ public final class CacheMemorySource extends AbstractCacheSource { c++; } return list; - }, getExecutor()); + }); } @Override public CompletableFuture> zscanAsync(String key, Type scoreType, AtomicLong cursor, int limit, String pattern) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return new ArrayList<>(); } @@ -1609,12 +1609,12 @@ public final class CacheMemorySource extends AbstractCacheSource { Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); return sets.stream().filter(en -> regx.test(en.getValue())).collect(Collectors.toList()); } - }, getExecutor()); + }); } @Override public CompletableFuture zremAsync(String key, String... members) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return 0L; } @@ -1636,12 +1636,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } sets.removeAll(dels); return c; - }, getExecutor()); + }); } @Override public CompletableFuture> zmscoreAsync(String key, Class scoreType, String... members) { - return supplyAsync(() -> { + return supplyFuture(() -> { List list = new ArrayList<>(); if (key == null) { for (int i = 0; i < members.length; i++) { @@ -1666,7 +1666,7 @@ public final class CacheMemorySource extends AbstractCacheSource { list.add(map.get(m)); } return list; - }, getExecutor()); + }); } private T formatScore(Class scoreType, Number score) { @@ -1688,7 +1688,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture zscoreAsync(String key, Class scoreType, String member) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return null; } @@ -1698,12 +1698,12 @@ public final class CacheMemorySource extends AbstractCacheSource { } Set sets = entry.csetValue; return formatScore(scoreType, sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null)); - }, getExecutor()); + }); } @Override public CompletableFuture sremAsync(String key, Type type, T... values) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (key == null) { return 0L; } @@ -1712,33 +1712,33 @@ public final class CacheMemorySource extends AbstractCacheSource { return 0L; } return entry.csetValue.removeAll(List.of(values)) ? 1L : 0L; - }, getExecutor()); + }); } @Override public CompletableFuture dbsizeAsync() { - return supplyAsync(() -> { + return supplyFuture(() -> { return (long) container.size(); - }, getExecutor()); + }); } @Override public CompletableFuture flushdbAsync() { - return runAsync(() -> { + return runFuture(() -> { container.clear(); - }, getExecutor()); + }); } @Override public CompletableFuture flushallAsync() { - return runAsync(() -> { + return runFuture(() -> { container.clear(); - }, getExecutor()); + }); } @Override public CompletableFuture> keysAsync(String pattern) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (pattern == null || pattern.isEmpty()) { return new ArrayList<>(container.keySet()); } else { @@ -1747,12 +1747,12 @@ public final class CacheMemorySource extends AbstractCacheSource { container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); return rs; } - }, getExecutor()); + }); } @Override public CompletableFuture> scanAsync(AtomicLong cursor, int limit, String pattern) { - return supplyAsync(() -> { + return supplyFuture(() -> { if (pattern == null || pattern.isEmpty()) { return new ArrayList<>(container.keySet()); } else { @@ -1761,7 +1761,7 @@ public final class CacheMemorySource extends AbstractCacheSource { container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); return rs; } - }, getExecutor()); + }); } @Override @@ -1769,11 +1769,19 @@ public final class CacheMemorySource extends AbstractCacheSource { if (startsWith == null) { return keysAsync(); } - return supplyAsync(() -> { + return supplyFuture(() -> { List rs = new ArrayList<>(); container.keySet().stream().filter(x -> x.startsWith(startsWith)).forEach(x -> rs.add(x)); return rs; - }, getExecutor()); + }); + } + + public static enum CacheNodeType { + LONG, STRING, OBJECT, + ATOMIC, DOUBLE, + SET_SORTED, + SET_OBJECT, + MAP; } protected CacheEntryType findEntryType(Type type) { @@ -1910,55 +1918,55 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture> getexCollectionAsync(final String key, final int expireSeconds, final Type componentType) { - return supplyAsync(() -> getexCollection(key, expireSeconds, componentType), getExecutor()); + return supplyFuture(() -> getexCollection(key, expireSeconds, componentType)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getexStringCollectionAsync(final String key, final int expireSeconds) { - return supplyAsync(() -> getexStringCollection(key, expireSeconds), getExecutor()); + return supplyFuture(() -> getexStringCollection(key, expireSeconds)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getexLongCollectionAsync(final String key, final int expireSeconds) { - return supplyAsync(() -> getexLongCollection(key, expireSeconds), getExecutor()); + return supplyFuture(() -> getexLongCollection(key, expireSeconds)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture>> getCollectionMapAsync(boolean set, Type componentType, String... keys) { - return supplyAsync(() -> getCollectionMap(set, componentType, keys), getExecutor()); + return supplyFuture(() -> getCollectionMap(set, componentType, keys)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getStringCollectionAsync(final String key) { - return supplyAsync(() -> getStringCollection(key), getExecutor()); + return supplyFuture(() -> getStringCollection(key)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture>> getStringCollectionMapAsync(final boolean set, final String... keys) { - return supplyAsync(() -> getStringCollectionMap(set, keys), getExecutor()); + return supplyFuture(() -> getStringCollectionMap(set, keys)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getLongCollectionAsync(final String key) { - return supplyAsync(() -> getLongCollection(key), getExecutor()); + return supplyFuture(() -> getLongCollection(key)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture>> getLongCollectionMapAsync(final boolean set, final String... keys) { - return supplyAsync(() -> getLongCollectionMap(set, keys), getExecutor()); + return supplyFuture(() -> getLongCollectionMap(set, keys)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getCollectionAsync(String key, Type componentType) { - return supplyAsync(() -> getCollection(key, componentType), getExecutor()); + return supplyFuture(() -> getCollection(key, componentType)); } @Override @@ -2025,13 +2033,13 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture> getLongMapAsync(final String... keys) { - return supplyAsync(() -> getLongMap(keys), getExecutor()); + return supplyFuture(() -> getLongMap(keys)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture getLongArrayAsync(final String... keys) { - return supplyAsync(() -> getLongArray(keys), getExecutor()); + return supplyFuture(() -> getLongArray(keys)); } @Override @@ -2060,13 +2068,13 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture> getStringMapAsync(final String... keys) { - return supplyAsync(() -> getStringMap(keys), getExecutor()); + return supplyFuture(() -> getStringMap(keys)); } @Override @Deprecated(since = "2.8.0") public CompletableFuture getStringArrayAsync(final String... keys) { - return supplyAsync(() -> getStringArray(keys), getExecutor()); + return supplyFuture(() -> getStringArray(keys)); } @Override @@ -2082,7 +2090,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture> getMapAsync(final Type componentType, final String... keys) { - return supplyAsync(() -> getMap(componentType, keys), getExecutor()); + return supplyFuture(() -> getMap(componentType, keys)); } @Override @@ -2114,7 +2122,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture getCollectionSizeAsync(final String key) { - return supplyAsync(() -> getCollectionSize(key), getExecutor()); + return supplyFuture(() -> getCollectionSize(key)); } @Override diff --git a/src/main/java/org/redkale/util/Creator.java b/src/main/java/org/redkale/util/Creator.java index 422db2dfc..41f2b11b5 100644 --- a/src/main/java/org/redkale/util/Creator.java +++ b/src/main/java/org/redkale/util/Creator.java @@ -173,6 +173,10 @@ public interface Creator { return (Object... params) -> func.apply(params); } + public static Creator load(Class clazz) { + return CreatorInner.creatorCacheMap.computeIfAbsent(clazz, v -> create(clazz)); + } + /** * 根据指定的class采用ASM技术生产Creator。 * diff --git a/src/main/java/org/redkale/util/Reproduce.java b/src/main/java/org/redkale/util/Reproduce.java index 7bb38b7c8..ac40a8a0d 100644 --- a/src/main/java/org/redkale/util/Reproduce.java +++ b/src/main/java/org/redkale/util/Reproduce.java @@ -2,6 +2,7 @@ package org.redkale.util; import java.lang.reflect.Modifier; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.*; import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import org.redkale.asm.*; @@ -22,6 +23,34 @@ public interface Reproduce extends BiFunction { @Override public D apply(D dest, S src); + public static D copy(final D dest, final S src) { + if (src == null || dest == null) { + return null; + } + Class destClass = (Class) dest.getClass(); + Creator creator = Creator.load(destClass); + return load(destClass, (Class) src.getClass()).apply(creator.create(), src); + } + + public static D copy(final Class destClass, final S src) { + if (src == null) { + return null; + } + Creator creator = Creator.load(destClass); + return load(destClass, (Class) src.getClass()).apply(creator.create(), src); + } + + public static Reproduce load(final Class destClass, final Class srcClass) { + if (destClass == srcClass) { + return ReproduceInner.reproduceOneCaches + .computeIfAbsent(destClass, v -> create(destClass, srcClass)); + } else { + return ReproduceInner.reproduceTwoCaches + .computeIfAbsent(destClass, t -> new ConcurrentHashMap<>()) + .computeIfAbsent(srcClass, v -> create(destClass, srcClass)); + } + } + public static Reproduce create(final Class destClass, final Class srcClass) { return create(destClass, srcClass, (BiPredicate) null, (Map) null); } @@ -84,16 +113,26 @@ public interface Reproduce extends BiFunction { //mv.setDebug(true); for (java.lang.reflect.Field field : srcClass.getFields()) { - if (Modifier.isStatic(field.getModifiers())) continue; - if (Modifier.isFinal(field.getModifiers())) continue; - if (!Modifier.isPublic(field.getModifiers())) continue; + if (Modifier.isStatic(field.getModifiers())) { + continue; + } + if (Modifier.isFinal(field.getModifiers())) { + continue; + } + if (!Modifier.isPublic(field.getModifiers())) { + continue; + } final String sfname = field.getName(); - if (srcColumnPredicate != null && !srcColumnPredicate.test(field, sfname)) continue; + if (srcColumnPredicate != null && !srcColumnPredicate.test(field, sfname)) { + continue; + } final String dfname = names == null ? sfname : names.getOrDefault(sfname, sfname); java.lang.reflect.Method setter = null; try { - if (!field.getType().equals(destClass.getField(dfname).getType())) continue; + if (!field.getType().equals(destClass.getField(dfname).getType())) { + continue; + } } catch (Exception e) { try { char[] cs = dfname.toCharArray(); @@ -116,19 +155,31 @@ public interface Reproduce extends BiFunction { } for (java.lang.reflect.Method getter : srcClass.getMethods()) { - if (Modifier.isStatic(getter.getModifiers())) continue; - if (getter.getParameterTypes().length > 0) continue; - if ("getClass".equals(getter.getName())) continue; - if (!getter.getName().startsWith("get") && !getter.getName().startsWith("is")) continue; + if (Modifier.isStatic(getter.getModifiers())) { + continue; + } + if (getter.getParameterTypes().length > 0) { + continue; + } + if ("getClass".equals(getter.getName())) { + continue; + } + if (!getter.getName().startsWith("get") && !getter.getName().startsWith("is")) { + continue; + } final boolean is = getter.getName().startsWith("is"); String sfname = getter.getName().substring(is ? 2 : 3); - if (sfname.isEmpty()) continue; + if (sfname.isEmpty()) { + continue; + } if (sfname.length() < 2 || Character.isLowerCase(sfname.charAt(1))) { char[] cs = sfname.toCharArray(); cs[0] = Character.toLowerCase(cs[0]); sfname = new String(cs); } - if (srcColumnPredicate != null && !srcColumnPredicate.test(getter, sfname)) continue; + if (srcColumnPredicate != null && !srcColumnPredicate.test(getter, sfname)) { + continue; + } final String dfname = names == null ? sfname : names.getOrDefault(sfname, sfname); java.lang.reflect.Method setter = null; @@ -141,7 +192,9 @@ public interface Reproduce extends BiFunction { } catch (Exception e) { try { srcField = destClass.getField(dfname); - if (!getter.getReturnType().equals(srcField.getType())) continue; + if (!getter.getReturnType().equals(srcField.getType())) { + continue; + } } catch (Exception e2) { continue; } @@ -190,4 +243,12 @@ public interface Reproduce extends BiFunction { } } + static class ReproduceInner { + + static final ConcurrentHashMap reproduceOneCaches = new ConcurrentHashMap(); + + static final ConcurrentHashMap> reproduceTwoCaches = new ConcurrentHashMap(); + + } + } diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java index c4a99981d..307293ec2 100644 --- a/src/main/java/org/redkale/util/Utility.java +++ b/src/main/java/org/redkale/util/Utility.java @@ -884,6 +884,39 @@ public final class Utility { return UUID.randomUUID().toString().replace("-", ""); } + /** + * 比较两个版本号的大小,ver1小于ver2返回 -1 + * + * @param version1 版本号 + * @param version2 版本号 + * + * @return 版本大小 + */ + public static int compareVersion(String version1, String version2) { + if (isEmpty(version1)) { + return isEmpty(version2) ? 0 : -1; + } + if (isEmpty(version2)) { + return 1; + } + String[] ver1 = version1.split("\\."); + String[] ver2 = version2.split("\\."); + int len = Math.min(ver1.length, ver2.length); + for (int i = 0; i < len; i++) { + if (ver1[i].length() > ver2[i].length()) { + return 1; + } + if (ver1[i].length() < ver2[i].length()) { + return -1; + } + int v = Integer.parseInt(ver1[i]) - Integer.parseInt(ver2[i]); + if (v != 0) { + return v > 0 ? 1 : -1; + } + } + return 0; + } + /** * 将一个或多个新元素添加到数组开始,数组中的元素自动后移 *