优化CacheSource

This commit is contained in:
redkale
2023-09-07 15:20:44 +08:00
parent 8af945d8c2
commit da41c7f7fa
3 changed files with 37 additions and 26 deletions

View File

@@ -64,7 +64,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
};
private final Map<String, List<CacheEventListener<byte[]>>> pubsubListeners = new ConcurrentHashMap<>();
//key: topic
private final Map<String, Set<CacheEventListener<byte[]>>> pubsubListeners = new ConcurrentHashMap<>();
private ExecutorService pubsubExecutor;
@@ -210,7 +211,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
throw new RedkaleException("topics is empty");
}
for (String topic : topics) {
pubsubListeners.computeIfAbsent(topic, t -> new CopyOnWriteArrayList<>()).add(listener);
pubsubListeners.computeIfAbsent(topic, t -> new CopyOnWriteArraySet<>()).add(listener);
}
return CompletableFuture.completedFuture(null);
}
@@ -220,24 +221,24 @@ public final class CacheMemorySource extends AbstractCacheSource {
int c = 0;
if (listener == null) {
if (topics == null || topics.length < 1) { //清空所有订阅者
for (List<CacheEventListener<byte[]>> listeners : pubsubListeners.values()) {
for (Set<CacheEventListener<byte[]>> listeners : pubsubListeners.values()) {
c += listeners != null ? listeners.size() : 0;
}
pubsubListeners.clear();
} else {
for (String topic : topics) { //清空指定topic的订阅者
List<CacheEventListener<byte[]>> listeners = pubsubListeners.remove(topic);
Set<CacheEventListener<byte[]>> listeners = pubsubListeners.remove(topic);
c += listeners != null ? listeners.size() : 0;
}
}
} else {
if (topics == null || topics.length < 1) {
for (List<CacheEventListener<byte[]>> listeners : pubsubListeners.values()) {
for (Set<CacheEventListener<byte[]>> listeners : pubsubListeners.values()) {
c += listeners != null && listeners.remove(listener) ? 1 : 0;
}
} else {
for (String topic : topics) {
List<CacheEventListener<byte[]>> listeners = pubsubListeners.get(topic);
Set<CacheEventListener<byte[]>> listeners = pubsubListeners.get(topic);
c += listeners != null && listeners.remove(listener) ? 1 : 0;
}
}
@@ -249,7 +250,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
public CompletableFuture<Integer> publishAsync(final String topic, final byte[] message) {
Objects.requireNonNull(topic);
Objects.requireNonNull(message);
List<CacheEventListener<byte[]>> listeners = pubsubListeners.get(topic);
Set<CacheEventListener<byte[]>> listeners = pubsubListeners.get(topic);
if (listeners == null || listeners.isEmpty()) {
return CompletableFuture.completedFuture(0);
}
@@ -1856,7 +1857,10 @@ public final class CacheMemorySource extends AbstractCacheSource {
@Override
public CompletableFuture<List<String>> scanAsync(AtomicLong cursor, int limit, String pattern) {
return keysAsync(pattern);
return keysAsync(pattern).thenApply(v -> {
cursor.set(0);
return v;
});
}
@Override

View File

@@ -7,11 +7,13 @@ package org.redkale.source;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.annotation.*;
import org.redkale.convert.Convert;
import org.redkale.convert.TextConvert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.*;
@@ -77,7 +79,7 @@ public interface CacheSource extends Resourcable {
//------------------------ 发布 PUB ------------------------
default <T> int publish(String topic, T message) {
return publish(topic, JsonConvert.root(), message.getClass(), message);
return publish(topic, null, message.getClass(), message);
}
default <T> int publish(String topic, Convert convert, T message) {
@@ -85,12 +87,11 @@ public interface CacheSource extends Resourcable {
}
default <T> int publish(String topic, Type messageType, T message) {
return publish(topic, JsonConvert.root(), messageType, message);
return publish(topic, null, messageType, message);
}
default <T> int publish(String topic, Convert convert, Type messageType, T message) {
final Convert c = convert == null ? JsonConvert.root() : convert;
return publish(topic, c.convertToBytes(messageType, message));
return publishAsync(topic, convert, messageType, message).join();
}
default int publish(String topic, byte[] message) {
@@ -98,7 +99,7 @@ public interface CacheSource extends Resourcable {
}
default <T> CompletableFuture<Integer> publishAsync(String topic, T message) {
return publishAsync(topic, JsonConvert.root(), message.getClass(), message);
return publishAsync(topic, null, message.getClass(), message);
}
default <T> CompletableFuture<Integer> publishAsync(String topic, Convert convert, T message) {
@@ -106,10 +107,16 @@ public interface CacheSource extends Resourcable {
}
default <T> CompletableFuture<Integer> publishAsync(String topic, Type messageType, T message) {
return publishAsync(topic, JsonConvert.root(), messageType, message);
return publishAsync(topic, null, messageType, message);
}
default <T> CompletableFuture<Integer> publishAsync(String topic, Convert convert, Type messageType, T message) {
if (message instanceof byte[]) {
return publishAsync(topic, (byte[]) message);
}
if (messageType == String.class && (convert == null || convert instanceof TextConvert)) {
return publishAsync(topic, message.toString().getBytes(StandardCharsets.UTF_8));
}
final Convert c = convert == null ? JsonConvert.root() : convert;
return publishAsync(topic, c.convertToBytes(messageType, message));
}

View File

@@ -153,19 +153,19 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
return and(new FilterNode(column, express, value));
}
public final <T extends Serializable> FilterNode and(LambdaSupplier<T> func) {
public final <F extends Serializable> FilterNode and(LambdaSupplier<F> func) {
return and(func, null);
}
public final <T extends Serializable> FilterNode and(LambdaSupplier<T> func, FilterExpress express) {
public final <F extends Serializable> FilterNode and(LambdaSupplier<F> func, FilterExpress express) {
return and(new FilterNode(LambdaSupplier.readColumn(func), express, func.get()));
}
public final <T> FilterNode and(LambdaFunction<T, ?> func, Serializable value) {
public final <T, F extends Serializable> FilterNode and(LambdaFunction<T, F> func, F value) {
return and(func, null, value);
}
public final <T> FilterNode and(LambdaFunction<T, ?> func, FilterExpress express, Serializable value) {
public final <T, F extends Serializable> FilterNode and(LambdaFunction<T, F> func, FilterExpress express, F value) {
return and(new FilterNode(LambdaFunction.readColumn(func), express, value));
}
@@ -181,19 +181,19 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
return or(new FilterNode(column, express, value));
}
public final <T extends Serializable> FilterNode or(LambdaSupplier<T> func) {
public final <F extends Serializable> FilterNode or(LambdaSupplier<F> func) {
return or(func, null);
}
public final <T extends Serializable> FilterNode or(LambdaSupplier<T> func, FilterExpress express) {
public final <F extends Serializable> FilterNode or(LambdaSupplier<F> func, FilterExpress express) {
return or(new FilterNode(LambdaSupplier.readColumn(func), express, func.get()));
}
public final <T> FilterNode or(LambdaFunction<T, ?> func, Serializable value) {
public final <T, F extends Serializable> FilterNode or(LambdaFunction<T, F> func, F value) {
return or(func, null, value);
}
public final <T> FilterNode or(LambdaFunction<T, ?> func, FilterExpress express, Serializable value) {
public final <T, F extends Serializable> FilterNode or(LambdaFunction<T, F> func, FilterExpress express, F value) {
return or(new FilterNode(LambdaFunction.readColumn(func), express, value));
}
@@ -361,19 +361,19 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
return new FilterNode(column, express, value);
}
public static <T extends Serializable> FilterNode create(LambdaSupplier<T> func) {
public static <F extends Serializable> FilterNode create(LambdaSupplier<F> func) {
return create(func, null);
}
public static <T extends Serializable> FilterNode create(LambdaSupplier<T> func, FilterExpress express) {
public static <F extends Serializable> FilterNode create(LambdaSupplier<F> func, FilterExpress express) {
return new FilterNode(LambdaSupplier.readColumn(func), express, func.get());
}
public static <T> FilterNode create(LambdaFunction<T, ?> func, Serializable value) {
public static <T, F extends Serializable> FilterNode create(LambdaFunction<T, F> func, F value) {
return create(func, null, value);
}
public static <T> FilterNode create(LambdaFunction<T, ?> func, FilterExpress express, Serializable value) {
public static <T, F extends Serializable> FilterNode create(LambdaFunction<T, F> func, FilterExpress express, F value) {
return new FilterNode(LambdaFunction.readColumn(func), express, value);
}