CacheSource增加sorted set操作

This commit is contained in:
redkale
2023-06-13 12:10:06 +08:00
parent 98f89d87d4
commit e2c6990265
4 changed files with 772 additions and 466 deletions

View File

@@ -1,50 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
【================================================ 已废弃 ================================================】建议使用 source.properties
其配置算是标准的JPA配置文件的缩略版
-->
<persistence>
<!-- 系统基本库 -->
<persistence-unit name="demouser">
<properties>
<!--
DataSource的实现类没有设置默认为org.redkale.source.DataJdbcSource的实现使用常规基于JDBC的数据库驱动一般无需设置
-->
<property name="javax.persistence.datasource" value="org.redkale.source.DataJdbcSource"/>
<!--
是否开启缓存(标记为@Cacheable的Entity类),值目前只支持两种: ALL: 所有开启缓存。 NONE: 关闭所有缓存, 非NONE字样统一视为ALL
-->
<property name="javax.persistence.cachemode" value="ALL"/>
<!--
是否自动建表当表不存在的时候, 目前只支持mysql、postgres 默认为false
-->
<property name="javax.persistence.table.autoddl" value="false"/>
<!-- 多个URL用;隔开如分布式SearchSource需要配多个URL -->
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbuser?characterEncoding=utf8"/>
<property name="javax.persistence.jdbc.user" value="root"/>
<property name="javax.persistence.jdbc.password" value="123456"/>
<!-- 最大连接数默认值CPU数 -->
<property name="javax.persistence.connections.limit" value="12"/>
<!-- 包含的SQL模板相当于反向LIKE不同的JDBC驱动的SQL语句不一样Redkale内置了MySQL的语句 -->
<property name="javax.persistence.contain.sqltemplate" value="LOCATE(#{keystr}, #{column}) > 0"/>
<property name="javax.persistence.notcontain.sqltemplate" value="LOCATE(#{keystr}, #{column}) = 0"/>
<!-- 复制表结构的SQL模板Redkale内置了MySQL的语句 -->
<property name="javax.persistence.tablenotexist.sqlstates" value="42000;42S02"/>
<property name="javax.persistence.tablecopy.sqltemplate" value="CREATE TABLE IF NOT EXISTS #{newtable} LIKE #{oldtable}"/>
</properties>
</persistence-unit>
<!-- IM消息库 -->
<persistence-unit name="demoim">
<properties>
<!-- jdbc:mysql://127.0.0.1:3306/dbim?allowPublicKeyRetrieval=true&amp;rewriteBatchedStatements=true&amp;serverTimezone=UTC&amp;characterEncoding=utf8 -->
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbim?characterEncoding=utf8"/>
<property name="javax.persistence.jdbc.user" value="root"/>
<property name="javax.persistence.jdbc.password" value="123456"/>
</properties>
</persistence-unit>
</persistence>

View File

@@ -963,11 +963,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
public long sdiffstore(final String key, final String srcKey, final String... srcKey2s) {
Set rs = sdiff(srcKey, Object.class, srcKey2s);
if (container.containsKey(key)) {
CopyOnWriteArraySet set = container.get(srcKey).csetValue;
Set set = container.get(srcKey).csetValue;
set.clear();
set.addAll(rs);
} else {
appendSetItem(CacheEntryType.OBJECT_SET, key, rs);
appendSetItem(CacheEntryType.SET_OBJECT, key, rs);
}
return rs.size();
}
@@ -1012,11 +1012,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
public long sinterstore(final String key, final String srcKey, final String... srcKey2s) {
Set rs = sinter(srcKey, Object.class, srcKey2s);
if (container.containsKey(key)) {
CopyOnWriteArraySet set = container.get(srcKey).csetValue;
Set set = container.get(srcKey).csetValue;
set.clear();
set.addAll(rs);
} else {
appendSetItem(CacheEntryType.OBJECT_SET, key, rs);
appendSetItem(CacheEntryType.SET_OBJECT, key, rs);
}
return rs.size();
}
@@ -1171,7 +1171,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> void lpush(final String key, final Type componentType, T... values) {
for (T value : values) {
appendListItem(CacheEntryType.OBJECT_LIST, false, key, value);
appendListItem(CacheEntryType.LIST_OBJECT, false, key, value);
}
}
@@ -1184,7 +1184,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
public <T> void lpushx(final String key, final Type componentType, T... values) {
if (container.containsKey(key)) {
for (T value : values) {
appendListItem(CacheEntryType.OBJECT_LIST, false, key, value);
appendListItem(CacheEntryType.LIST_OBJECT, false, key, value);
}
}
}
@@ -1288,7 +1288,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
public <T> void rpushx(String key, Type componentType, T... values) {
if (container.containsKey(key)) {
for (T value : values) {
appendListItem(CacheEntryType.OBJECT_LIST, key, value);
appendListItem(CacheEntryType.LIST_OBJECT, key, value);
}
}
}
@@ -1300,7 +1300,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public <T> void rpush(String key, Type componentType, T... values) {
appendListItem(CacheEntryType.OBJECT_LIST, key, values);
appendListItem(CacheEntryType.LIST_OBJECT, key, values);
}
@Override
@@ -1437,7 +1437,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
CacheEntry entry = container.get(key);
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
CopyOnWriteArraySet set = new CopyOnWriteArraySet();
Set set = cacheType == CacheEntryType.SET_SORTED ? Collections.synchronizedSet(new TreeSet<>()) : new CopyOnWriteArraySet();
entry = new CacheEntry(cacheType, key, null, set, null, null);
CacheEntry old = container.putIfAbsent(key, entry);
if (old != null) {
@@ -1452,8 +1452,102 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public <T> void sadd(String key, final Type componentType, T... values) {
appendSetItem(CacheEntryType.OBJECT_SET, key, List.of(values));
public void zadd(String key, CacheScoredValue... values) {
List<Object> list = new ArrayList<>();
for (CacheScoredValue v : values) {
list.add(new CacheScoredValue.NumberScoredValue(v));
}
appendSetItem(CacheEntryType.SET_SORTED, key, list);
}
@Override
public long zcard(String key) {
if (key == null) {
return 0L;
}
CacheEntry entry = container.get(key);
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
return 0L;
}
return entry.csetValue.size();
}
@Override
public long zrem(String key, String... members) {
if (key == null) {
return 0L;
}
CacheEntry entry = container.get(key);
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
return 0L;
}
Set<CacheScoredValue> sets = entry.csetValue;
long c = 0;
Set<String> keys = Set.of(members);
Iterator<CacheScoredValue> it = sets.iterator();
while (it.hasNext()) {
CacheScoredValue v = it.next();
if (keys.contains(v.getValue())) {
c++;
it.remove();
}
}
return c;
}
@Override
public <T extends Number> List<T> zmscore(String key, Class<T> scoreType, String... members) {
List<T> list = new ArrayList<>();
if (key == null) {
for (int i = 0; i < members.length; i++) {
list.add(null);
}
return list;
}
CacheEntry entry = container.get(key);
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
for (int i = 0; i < members.length; i++) {
list.add(null);
}
return list;
}
Set<String> keys = Set.of(members);
Set<CacheScoredValue> sets = entry.csetValue;
Map<String, T> map = new HashMap<>();
sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> {
map.put(v.getValue(), formatScore(scoreType, v.getScore()));
});
for (String m : members) {
list.add(map.get(m));
}
return list;
}
private <T extends Number> T formatScore(Class<T> scoreType, Number score) {
if (scoreType == int.class || scoreType == Integer.class) {
return (T) (Number) score.intValue();
} else if (scoreType == long.class || scoreType == Long.class) {
return (T) (Number) score.longValue();
} else if (scoreType == float.class || scoreType == Float.class) {
return (T) (Number) score.floatValue();
} else if (scoreType == double.class || scoreType == Double.class) {
return (T) (Number) score.doubleValue();
} else {
return (T) score;
}
}
@Override
public <T extends Number> T zscore(String key, Class<T> scoreType, String member) {
if (key == null) {
return null;
}
CacheEntry entry = container.get(key);
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
return null;
}
Set<CacheScoredValue> sets = entry.csetValue;
return (T) sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null);
}
@Override
@@ -1461,6 +1555,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
return runAsync(() -> sadd(key, componentType, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Long> zcardAsync(String key) {
return supplyAsync(() -> zcard(key), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> long srem(String key, Type type, T... values) {
if (key == null) {
@@ -1572,6 +1671,26 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyAsync(() -> sscan(key, componentType, cursor, limit, pattern), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> zaddAsync(String key, CacheScoredValue... values) {
return runAsync(() -> zadd(key, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T extends Number> CompletableFuture<List<T>> zmscoreAsync(String key, Class<T> type, String... members) {
return supplyAsync(() -> zmscore(key, type, members), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Long> zremAsync(String key, String... members) {
return supplyAsync(() -> zrem(key, members), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T extends Number> CompletableFuture<T> zscoreAsync(String key, Class<T> type, String member) {
return supplyAsync(() -> zscore(key, type, member), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected CacheEntryType findEntryType(Type type) {
if (type == String.class) {
return CacheEntryType.STRING;
@@ -1585,8 +1704,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
public static enum CacheEntryType {
LONG, STRING, OBJECT, BYTES, ATOMIC, MAP, DOUBLE,
LONG_SET, STRING_SET, OBJECT_SET,
LONG_LIST, STRING_LIST, OBJECT_LIST;
SET_LONG, SET_STRING, SET_OBJECT, SET_SORTED,
LIST_LONG, LIST_STRING, LIST_OBJECT;
}
public static final class CacheEntry<T> {
@@ -1606,20 +1725,20 @@ public final class CacheMemorySource extends AbstractCacheSource {
final ReentrantLock mapLock = new ReentrantLock();
CopyOnWriteArraySet<T> csetValue;
Set<T> csetValue;
ConcurrentLinkedDeque<T> listValue;
public CacheEntry(CacheEntryType cacheType, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
public CacheEntry(CacheEntryType cacheType, String key, T objectValue, Set<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
this(cacheType, 0, key, objectValue, csetValue, listValue, mapValue);
}
public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
public CacheEntry(CacheEntryType cacheType, int expireSeconds, String key, T objectValue, Set<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, csetValue, listValue, mapValue);
}
@ConstructorParameters({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "csetValue", "listValue", "mapValue"})
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, CopyOnWriteArraySet<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, String key, T objectValue, Set<T> csetValue, ConcurrentLinkedDeque<T> listValue, ConcurrentHashMap<String, Serializable> mapValue) {
this.cacheType = cacheType;
this.expireSeconds = expireSeconds;
this.lastAccessed = lastAccessed;
@@ -1637,12 +1756,12 @@ public final class CacheMemorySource extends AbstractCacheSource {
@ConvertColumn(ignore = true)
public boolean isListCacheType() {
return cacheType == CacheEntryType.LONG_LIST || cacheType == CacheEntryType.STRING_LIST || cacheType == CacheEntryType.OBJECT_LIST;
return cacheType == CacheEntryType.LIST_LONG || cacheType == CacheEntryType.LIST_STRING || cacheType == CacheEntryType.LIST_OBJECT;
}
@ConvertColumn(ignore = true)
public boolean isSetCacheType() {
return cacheType == CacheEntryType.LONG_SET || cacheType == CacheEntryType.STRING_SET || cacheType == CacheEntryType.OBJECT_SET;
return cacheType == CacheEntryType.SET_LONG || cacheType == CacheEntryType.SET_STRING || cacheType == CacheEntryType.SET_OBJECT || cacheType == CacheEntryType.SET_SORTED;
}
@ConvertColumn(ignore = true)
@@ -1675,7 +1794,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
return objectValue;
}
public CopyOnWriteArraySet<T> getCsetValue() {
public Set<T> getCsetValue() {
return csetValue;
}

View File

@@ -0,0 +1,102 @@
/*
*
*/
package org.redkale.source;
import java.io.Serializable;
import java.util.Objects;
import org.redkale.convert.ConvertColumn;
/**
*
* 有序集合的对象类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <S> 分数
*
* @since 2.8.0
*/
public interface CacheScoredValue<S extends Number> extends Serializable, Comparable<CacheScoredValue> {
public S getScore();
public String getValue();
public static NumberScoredValue create(Number score, String value) {
return new NumberScoredValue(score, value);
}
public static final class NumberScoredValue implements CacheScoredValue<Number> {
@ConvertColumn(index = 1)
private Number score;
@ConvertColumn(index = 2)
private String value;
public NumberScoredValue(Number score, String value) {
Objects.requireNonNull(score);
Objects.requireNonNull(value);
this.score = score;
this.value = value;
}
public NumberScoredValue(CacheScoredValue scoredValue) {
this.score = scoredValue.getScore();
this.value = scoredValue.getValue();
}
@Override
public Number getScore() {
return score;
}
@Override
public String getValue() {
return value;
}
public void setScore(Long score) {
this.score = score;
}
public void setValue(String value) {
this.value = value;
}
@Override
public int compareTo(CacheScoredValue o) {
return ((Comparable) this.score).compareTo((Number) o.getScore());
}
@Override
public int hashCode() {
return Objects.hashCode(this.value);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final NumberScoredValue other = (NumberScoredValue) obj;
return Objects.equals(this.value, other.value);
}
@Override
public String toString() {
return "{score:" + score + ", value:" + value + "}";
}
}
}

File diff suppressed because it is too large Load Diff