diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 02ca9bf23..de162a097 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -64,7 +64,8 @@ public final class CacheMemorySource extends AbstractCacheSource { } }; - private final Map>> pubsubListeners = new ConcurrentHashMap<>(); + //key: topic + private final Map>> pubsubListeners = new ConcurrentHashMap<>(); private ExecutorService pubsubExecutor; @@ -210,7 +211,7 @@ public final class CacheMemorySource extends AbstractCacheSource { throw new RedkaleException("topics is empty"); } for (String topic : topics) { - pubsubListeners.computeIfAbsent(topic, t -> new CopyOnWriteArrayList<>()).add(listener); + pubsubListeners.computeIfAbsent(topic, t -> new CopyOnWriteArraySet<>()).add(listener); } return CompletableFuture.completedFuture(null); } @@ -220,24 +221,24 @@ public final class CacheMemorySource extends AbstractCacheSource { int c = 0; if (listener == null) { if (topics == null || topics.length < 1) { //清空所有订阅者 - for (List> listeners : pubsubListeners.values()) { + for (Set> listeners : pubsubListeners.values()) { c += listeners != null ? listeners.size() : 0; } pubsubListeners.clear(); } else { for (String topic : topics) { //清空指定topic的订阅者 - List> listeners = pubsubListeners.remove(topic); + Set> listeners = pubsubListeners.remove(topic); c += listeners != null ? listeners.size() : 0; } } } else { if (topics == null || topics.length < 1) { - for (List> listeners : pubsubListeners.values()) { + for (Set> listeners : pubsubListeners.values()) { c += listeners != null && listeners.remove(listener) ? 1 : 0; } } else { for (String topic : topics) { - List> listeners = pubsubListeners.get(topic); + Set> listeners = pubsubListeners.get(topic); c += listeners != null && listeners.remove(listener) ? 1 : 0; } } @@ -249,7 +250,7 @@ public final class CacheMemorySource extends AbstractCacheSource { public CompletableFuture publishAsync(final String topic, final byte[] message) { Objects.requireNonNull(topic); Objects.requireNonNull(message); - List> listeners = pubsubListeners.get(topic); + Set> listeners = pubsubListeners.get(topic); if (listeners == null || listeners.isEmpty()) { return CompletableFuture.completedFuture(0); } @@ -1856,7 +1857,10 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture> scanAsync(AtomicLong cursor, int limit, String pattern) { - return keysAsync(pattern); + return keysAsync(pattern).thenApply(v -> { + cursor.set(0); + return v; + }); } @Override diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index 93019b2dd..7d3c25478 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -7,11 +7,13 @@ package org.redkale.source; import java.io.Serializable; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import org.redkale.annotation.*; import org.redkale.convert.Convert; +import org.redkale.convert.TextConvert; import org.redkale.convert.json.JsonConvert; import org.redkale.util.*; @@ -77,7 +79,7 @@ public interface CacheSource extends Resourcable { //------------------------ 发布 PUB ------------------------ default int publish(String topic, T message) { - return publish(topic, JsonConvert.root(), message.getClass(), message); + return publish(topic, null, message.getClass(), message); } default int publish(String topic, Convert convert, T message) { @@ -85,12 +87,11 @@ public interface CacheSource extends Resourcable { } default int publish(String topic, Type messageType, T message) { - return publish(topic, JsonConvert.root(), messageType, message); + return publish(topic, null, messageType, message); } default int publish(String topic, Convert convert, Type messageType, T message) { - final Convert c = convert == null ? JsonConvert.root() : convert; - return publish(topic, c.convertToBytes(messageType, message)); + return publishAsync(topic, convert, messageType, message).join(); } default int publish(String topic, byte[] message) { @@ -98,7 +99,7 @@ public interface CacheSource extends Resourcable { } default CompletableFuture publishAsync(String topic, T message) { - return publishAsync(topic, JsonConvert.root(), message.getClass(), message); + return publishAsync(topic, null, message.getClass(), message); } default CompletableFuture publishAsync(String topic, Convert convert, T message) { @@ -106,10 +107,16 @@ public interface CacheSource extends Resourcable { } default CompletableFuture publishAsync(String topic, Type messageType, T message) { - return publishAsync(topic, JsonConvert.root(), messageType, message); + return publishAsync(topic, null, messageType, message); } default CompletableFuture publishAsync(String topic, Convert convert, Type messageType, T message) { + if (message instanceof byte[]) { + return publishAsync(topic, (byte[]) message); + } + if (messageType == String.class && (convert == null || convert instanceof TextConvert)) { + return publishAsync(topic, message.toString().getBytes(StandardCharsets.UTF_8)); + } final Convert c = convert == null ? JsonConvert.root() : convert; return publishAsync(topic, c.convertToBytes(messageType, message)); } diff --git a/src/main/java/org/redkale/source/FilterNode.java b/src/main/java/org/redkale/source/FilterNode.java index 347e952ad..d47c7b6a7 100644 --- a/src/main/java/org/redkale/source/FilterNode.java +++ b/src/main/java/org/redkale/source/FilterNode.java @@ -153,19 +153,19 @@ public class FilterNode { //FilterNode 不能实现Serializable接口, 否则 return and(new FilterNode(column, express, value)); } - public final FilterNode and(LambdaSupplier func) { + public final FilterNode and(LambdaSupplier func) { return and(func, null); } - public final FilterNode and(LambdaSupplier func, FilterExpress express) { + public final FilterNode and(LambdaSupplier func, FilterExpress express) { return and(new FilterNode(LambdaSupplier.readColumn(func), express, func.get())); } - public final FilterNode and(LambdaFunction func, Serializable value) { + public final FilterNode and(LambdaFunction func, F value) { return and(func, null, value); } - public final FilterNode and(LambdaFunction func, FilterExpress express, Serializable value) { + public final FilterNode and(LambdaFunction func, FilterExpress express, F value) { return and(new FilterNode(LambdaFunction.readColumn(func), express, value)); } @@ -181,19 +181,19 @@ public class FilterNode { //FilterNode 不能实现Serializable接口, 否则 return or(new FilterNode(column, express, value)); } - public final FilterNode or(LambdaSupplier func) { + public final FilterNode or(LambdaSupplier func) { return or(func, null); } - public final FilterNode or(LambdaSupplier func, FilterExpress express) { + public final FilterNode or(LambdaSupplier func, FilterExpress express) { return or(new FilterNode(LambdaSupplier.readColumn(func), express, func.get())); } - public final FilterNode or(LambdaFunction func, Serializable value) { + public final FilterNode or(LambdaFunction func, F value) { return or(func, null, value); } - public final FilterNode or(LambdaFunction func, FilterExpress express, Serializable value) { + public final FilterNode or(LambdaFunction func, FilterExpress express, F value) { return or(new FilterNode(LambdaFunction.readColumn(func), express, value)); } @@ -361,19 +361,19 @@ public class FilterNode { //FilterNode 不能实现Serializable接口, 否则 return new FilterNode(column, express, value); } - public static FilterNode create(LambdaSupplier func) { + public static FilterNode create(LambdaSupplier func) { return create(func, null); } - public static FilterNode create(LambdaSupplier func, FilterExpress express) { + public static FilterNode create(LambdaSupplier func, FilterExpress express) { return new FilterNode(LambdaSupplier.readColumn(func), express, func.get()); } - public static FilterNode create(LambdaFunction func, Serializable value) { + public static FilterNode create(LambdaFunction func, F value) { return create(func, null, value); } - public static FilterNode create(LambdaFunction func, FilterExpress express, Serializable value) { + public static FilterNode create(LambdaFunction func, FilterExpress express, F value) { return new FilterNode(LambdaFunction.readColumn(func), express, value); }