diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index f91f54f90..39307e725 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -5,17 +5,16 @@ */ package org.redkale.net.sncp; +import java.lang.annotation.*; import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.RetentionPolicy.RUNTIME; -import java.lang.annotation.*; import java.lang.reflect.*; import java.nio.channels.CompletionHandler; import java.util.*; import org.redkale.annotation.*; -import org.redkale.annotation.ResourceType; -import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import org.redkale.asm.*; +import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; import org.redkale.convert.Convert; @@ -23,7 +22,13 @@ import org.redkale.convert.bson.BsonConvert; import org.redkale.mq.MessageAgent; import org.redkale.net.sncp.SncpRemoteInfo.SncpRemoteAction; import org.redkale.service.*; -import org.redkale.util.*; +import org.redkale.util.AnyValue; +import org.redkale.util.RedkaleClassLoader; +import org.redkale.util.Resourcable; +import org.redkale.util.ResourceFactory; +import org.redkale.util.TypeToken; +import org.redkale.util.Uint128; +import org.redkale.util.Utility; /** * Service Node Communicate Protocol @@ -83,6 +88,12 @@ public abstract class Sncp { if (method.getAnnotation(Local.class) != null) { continue; } + if (method.getAnnotation(ResourceListener.class) != null) { + continue; + } + if (method.getAnnotation(Scheduled.class) != null) { + continue; + } if (method.getName().equals("getClass") || method.getName().equals("toString")) { continue; } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 5ae61a008..377cd112d 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -15,8 +15,8 @@ import java.util.function.*; import java.util.logging.*; import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.redkale.annotation.AutoLoad; import org.redkale.annotation.*; +import org.redkale.annotation.AutoLoad; import org.redkale.annotation.ResourceListener; import org.redkale.annotation.ResourceType; import org.redkale.convert.*; @@ -54,16 +54,18 @@ public final class CacheMemorySource extends AbstractCacheSource { private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - protected final ConcurrentHashMap container = new ConcurrentHashMap<>(); + private final ConcurrentHashMap container = new ConcurrentHashMap<>(); - protected final ReentrantLock containerLock = new ReentrantLock(); + private final ReentrantLock containerLock = new ReentrantLock(); - protected final BiConsumer futureCompleteConsumer = (r, t) -> { + private final BiConsumer futureCompleteConsumer = (r, t) -> { if (t != null) { logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t); } }; + private final Map>> pubsubListeners = new ConcurrentHashMap<>(); + public CacheMemorySource(String resourceName) { this.name = resourceName; } @@ -163,21 +165,63 @@ public final class CacheMemorySource extends AbstractCacheSource { //------------------------ 订阅发布 SUB/PUB ------------------------ @Override - public CompletableFuture> pubsubChannelsAsync(@Nullable String pattern){ - throw new UnsupportedOperationException("Not supported yet."); + public CompletableFuture> pubsubChannelsAsync(@Nullable String pattern) { + Predicate predicate = Pattern.compile(pattern).asPredicate(); + return CompletableFuture.completedFuture(pubsubListeners.keySet().stream().filter(predicate).collect(Collectors.toList())); } - + @Override - public CompletableFuture subscribeAsync(CacheEventListener consumer, String... topics) { - Objects.requireNonNull(consumer); - throw new UnsupportedOperationException("Not supported yet."); + public CompletableFuture subscribeAsync(CacheEventListener listener, String... topics) { + Objects.requireNonNull(listener); + if (topics == null || topics.length < 1) { + throw new RedkaleException("topics is empty"); + } + for (String topic : topics) { + pubsubListeners.computeIfAbsent(topic, t -> new CopyOnWriteArrayList<>()).add(listener); + } + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture unsubscribeAsync(CacheEventListener listener, String... topics) { + int c = 0; + if (listener == null) { + if (topics == null || topics.length < 1) { //清空所有订阅者 + for (List> listeners : pubsubListeners.values()) { + c += listeners != null ? listeners.size() : 0; + } + pubsubListeners.clear(); + } else { + for (String topic : topics) { //清空指定topic的订阅者 + List> listeners = pubsubListeners.remove(topic); + c += listeners != null ? listeners.size() : 0; + } + } + } else { + if (topics == null || topics.length < 1) { + for (List> listeners : pubsubListeners.values()) { + c += listeners != null && listeners.remove(listener) ? 1 : 0; + } + } else { + for (String topic : topics) { + List> listeners = pubsubListeners.get(topic); + c += listeners != null && listeners.remove(listener) ? 1 : 0; + } + } + } + return CompletableFuture.completedFuture(c); } @Override public CompletableFuture publishAsync(String topic, byte[] message) { Objects.requireNonNull(topic); Objects.requireNonNull(message); - throw new UnsupportedOperationException("Not supported yet."); + List> listeners = pubsubListeners.get(topic); + if (listeners == null || listeners.isEmpty()) { + return CompletableFuture.completedFuture(0); + } + listeners.parallelStream().forEach(listener -> listener.onMessage(topic, message)); + return CompletableFuture.completedFuture(1); } //------------------------ 字符串 String ------------------------ diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index fd140bc6d..93019b2dd 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -58,6 +58,10 @@ public interface CacheSource extends Resourcable { subscribeAsync(listener, topics).join(); } + default void unsubscribe(CacheEventListener listener, String... topics) { + unsubscribeAsync(listener, topics).join(); + } + default CompletableFuture subscribeAsync(Type messageType, CacheEventListener listener, String... topics) { return subscribeAsync(JsonConvert.root(), messageType, listener, topics); } @@ -69,6 +73,8 @@ public interface CacheSource extends Resourcable { public CompletableFuture subscribeAsync(CacheEventListener listener, String... topics); + public CompletableFuture unsubscribeAsync(CacheEventListener listener, String... topics); + //------------------------ 发布 PUB ------------------------ default int publish(String topic, T message) { return publish(topic, JsonConvert.root(), message.getClass(), message);