CacheMemorySource优化

This commit is contained in:
redkale
2023-12-08 10:01:05 +08:00
parent 6615326edb
commit e7a7461d9d

View File

@@ -24,6 +24,7 @@ import org.redkale.convert.*;
import org.redkale.convert.json.*;
import org.redkale.service.Local;
import org.redkale.util.*;
import static org.redkale.util.Utility.convertValue;
import static org.redkale.util.Utility.isEmpty;
/**
@@ -303,18 +304,12 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public CompletableFuture<Void> msetAsync(Map map) {
return runFuture(() -> {
map.forEach((key, val) -> {
set0(key.toString(), 0, null, val);
});
});
return runFuture(() -> map.forEach((key, val) -> set0(key.toString(), 0, null, val)));
}
@Override
public <T> CompletableFuture<Void> setAsync(String key, Convert convert, Type type, T value) {
return runFuture(() -> {
set0(key, 0, type, value);
});
return runFuture(() -> set0(key, 0, type, value));
}
@Override
@@ -353,7 +348,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
CacheEntry entry = find(key, CacheEntryType.OBJECT);
T old = entry == null ? null : (T) entry.objectValue;
set0(key, 0, type, value);
return old;
return convertValue(type, old);
});
}
@@ -370,7 +365,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
} finally {
containerLock.unlock();
}
return (T) entry.objectValue;
return convertValue(type, entry.objectValue);
});
}
@@ -390,7 +385,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.objectValue = Utility.convertValue(type, value);
entry.objectValue = convertValue(type, value);
entry.expireSeconds(expireSeconds);
entry.lastAccessed = System.currentTimeMillis();
} finally {
@@ -400,9 +395,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> CompletableFuture<Void> setexAsync(String key, int expireSeconds, Convert convert, Type type, T value) {
return runFuture(() -> {
set0(key, expireSeconds, type, value);
});
return runFuture(() -> set0(key, expireSeconds, type, value));
}
@Override
@@ -623,9 +616,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
// OBJECT, ATOMIC, DOUBLE, SSET, ZSET, LIST, MAP;
switch (entry.cacheType) {
case ATOMIC:
return Utility.convertValue(type, (AtomicLong) entry.objectValue);
return convertValue(type, (AtomicLong) entry.objectValue);
case DOUBLE:
return Utility.convertValue(type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue()));
return convertValue(type, Double.longBitsToDouble(((AtomicLong) entry.objectValue).longValue()));
case SSET:
return (T) new LinkedHashSet(entry.setValue);
case ZSET:
@@ -775,7 +768,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
boolean rs = entry.mapValue.putIfAbsent(field, Utility.convertValue(type, value)) == null;
boolean rs = entry.mapValue.putIfAbsent(field, convertValue(type, value)) == null;
entry.lastAccessed = System.currentTimeMillis();
return rs;
} finally {
@@ -795,9 +788,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public CompletableFuture<Void> hmsetAsync(final String key, final Map map) {
return runFuture(() -> {
map.forEach((k, v) -> hset0(key, (String) k, null, v));
});
return runFuture(() -> map.forEach((k, v) -> hset0(key, (String) k, null, v)));
}
@Override
@@ -810,7 +801,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
Map map = entry.mapValue;
List<T> rs = new ArrayList<>(fields.length);
for (String field : fields) {
rs.add((T) Utility.convertValue(type, map.get(field)));
rs.add(convertValue(type, map.get(field)));
}
return rs;
});
@@ -824,9 +815,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return new LinkedHashMap();
} else {
Map map = new LinkedHashMap();
entry.mapValue.forEach((k, v) -> {
map.put(k, Utility.convertValue(type, v));
});
entry.mapValue.forEach((k, v) -> map.put(k, convertValue(type, v)));
return map;
}
});
@@ -839,7 +828,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (entry == null) {
return new ArrayList();
} else {
Stream<T> stream = entry.mapValue.values().stream().map(v -> Utility.convertValue(type, v));
Stream<T> stream = entry.mapValue.values().stream().map(v -> convertValue(type, v));
return new ArrayList(stream.collect(Collectors.toList()));
}
});
@@ -860,7 +849,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
} else {
Predicate<String> regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate();
Set<Map.Entry<String, Serializable>> set = entry.mapValue.entrySet();
return (Map) set.stream().filter(en -> regx.test(en.getKey())).collect(Collectors.toMap(en -> en.getKey(), en -> en.getValue()));
return set.stream().filter(en -> regx.test(en.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, en -> convertValue(type, en.getValue())));
}
});
}
@@ -876,7 +866,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return null;
}
Object obj = entry.mapValue.get(field);
return obj == null ? null : Utility.convertValue(type, obj);
return obj == null ? null : convertValue(type, obj);
});
}
@@ -914,7 +904,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
entry.mapValue.put(field, Utility.convertValue(type, value));
entry.mapValue.put(field, convertValue(type, value));
entry.lastAccessed = System.currentTimeMillis();
} finally {
entry.unlock();
@@ -958,7 +948,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
List list = new ArrayList(entry.listValue);
int pos = index >= 0 ? index : list.size() + index;
return pos >= list.size() ? null : (T) list.get(pos);
return pos >= list.size() ? null : convertValue(componentType, list.get(pos));
});
}
@@ -1069,7 +1059,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
return Utility.convertValue(componentType, entry.listValue.pollFirst());
return convertValue(componentType, entry.listValue.pollFirst());
} finally {
entry.unlock();
}
@@ -1121,7 +1111,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
entry.lock();
try {
return Utility.convertValue(componentType, entry.listValue.pollLast());
return convertValue(componentType, entry.listValue.pollLast());
} finally {
entry.unlock();
}
@@ -1138,9 +1128,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
ConcurrentLinkedDeque list = entry.listValue;
for (T val : values) {
list.add(val);
}
list.addAll(List.of(values));
} finally {
entry.unlock();
}
@@ -1166,9 +1154,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
ConcurrentLinkedDeque list = entry.listValue;
for (T val : values) {
list.add(val);
}
list.addAll(List.of(values));
} finally {
entry.unlock();
}
@@ -1205,7 +1191,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
for (int i = 0; i < Math.abs(count); i++) {
int index = ThreadLocalRandom.current().nextInt(vals.size());
T val = vals.get(index);
list.add(Utility.convertValue(componentType, val));
list.add(convertValue(componentType, val));
}
} else { //不可以重复
if (count >= vals.size()) {
@@ -1301,7 +1287,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
for (String k : key2s) {
CacheEntry en2 = find(k, CacheEntryType.SSET);
if (en2 != null) {
en2.setValue.forEach(v -> rs.remove(v));
en2.setValue.forEach(rs::remove);
}
}
return rs;
@@ -1309,9 +1295,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> CompletableFuture<Set<T>> sinterAsync(final String key, final Type componentType, final String... key2s) {
return supplyFuture(() -> {
return sinter0(key, key2s);
});
return supplyFuture(() -> sinter0(key, key2s));
}
@Override
@@ -1369,9 +1353,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> CompletableFuture<Set<T>> sunionAsync(final String key, final Type componentType, final String... key2s) {
return supplyFuture(() -> {
return sunion0(key, key2s);
});
return supplyFuture(() -> sunion0(key, key2s));
}
@Override
@@ -1423,7 +1405,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyFuture(() -> {
CacheEntry entry = find(key, CacheEntryType.SSET);
if (entry == null) {
return new LinkedHashSet<T>();
return new LinkedHashSet<>();
}
return new LinkedHashSet<>(entry.setValue);
});
@@ -1481,9 +1463,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
entry.lock();
try {
Set set = entry.setValue;
for (T val : values) {
set.add(val);
}
set.addAll(List.of(values));
} finally {
entry.unlock();
}
@@ -1552,7 +1532,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
Set<T> list = new LinkedHashSet<>();
int index = 0;
while (it.hasNext()) {
list.add(Utility.convertValue(componentType, it.next()));
list.add(convertValue(componentType, it.next()));
if (++index >= count) {
break;
}
@@ -1581,7 +1561,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
Iterator it = cset.iterator();
Set<T> list = new LinkedHashSet<>();
while (it.hasNext()) {
list.add((T) Utility.convertValue(componentType, it.next()));
list.add((T) convertValue(componentType, it.next()));
}
return list;
} finally {
@@ -1844,23 +1824,17 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public CompletableFuture<Long> dbsizeAsync() {
return supplyFuture(() -> {
return (long) container.size();
});
return supplyFuture(() -> (long) container.size());
}
@Override
public CompletableFuture<Void> flushdbAsync() {
return runFuture(() -> {
container.clear();
});
return runFuture(container::clear);
}
@Override
public CompletableFuture<Void> flushallAsync() {
return runFuture(() -> {
container.clear();
});
return runFuture(container::clear);
}
@Override
@@ -1946,7 +1920,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
private String key;
private int expireMills; //<=0表示永久保存
//<=0表示永久保存
private int expireMills;
private final ReentrantLock lock = new ReentrantLock();