unsubscribeAsync

This commit is contained in:
redkale
2023-09-02 12:28:38 +08:00
parent 95499542a4
commit 55d26693bc
3 changed files with 76 additions and 15 deletions

View File

@@ -5,17 +5,16 @@
*/
package org.redkale.net.sncp;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
import java.lang.reflect.*;
import java.nio.channels.CompletionHandler;
import java.util.*;
import org.redkale.annotation.*;
import org.redkale.annotation.ResourceType;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import org.redkale.asm.*;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.convert.Convert;
@@ -23,7 +22,13 @@ import org.redkale.convert.bson.BsonConvert;
import org.redkale.mq.MessageAgent;
import org.redkale.net.sncp.SncpRemoteInfo.SncpRemoteAction;
import org.redkale.service.*;
import org.redkale.util.*;
import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.Resourcable;
import org.redkale.util.ResourceFactory;
import org.redkale.util.TypeToken;
import org.redkale.util.Uint128;
import org.redkale.util.Utility;
/**
* Service Node Communicate Protocol
@@ -83,6 +88,12 @@ public abstract class Sncp {
if (method.getAnnotation(Local.class) != null) {
continue;
}
if (method.getAnnotation(ResourceListener.class) != null) {
continue;
}
if (method.getAnnotation(Scheduled.class) != null) {
continue;
}
if (method.getName().equals("getClass") || method.getName().equals("toString")) {
continue;
}

View File

@@ -15,8 +15,8 @@ import java.util.function.*;
import java.util.logging.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.*;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceListener;
import org.redkale.annotation.ResourceType;
import org.redkale.convert.*;
@@ -54,16 +54,18 @@ public final class CacheMemorySource extends AbstractCacheSource {
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final ConcurrentHashMap<String, CacheEntry> container = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CacheEntry> container = new ConcurrentHashMap<>();
protected final ReentrantLock containerLock = new ReentrantLock();
private final ReentrantLock containerLock = new ReentrantLock();
protected final BiConsumer futureCompleteConsumer = (r, t) -> {
private final BiConsumer futureCompleteConsumer = (r, t) -> {
if (t != null) {
logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t);
}
};
private final Map<String, List<CacheEventListener<byte[]>>> pubsubListeners = new ConcurrentHashMap<>();
public CacheMemorySource(String resourceName) {
this.name = resourceName;
}
@@ -163,21 +165,63 @@ public final class CacheMemorySource extends AbstractCacheSource {
//------------------------ 订阅发布 SUB/PUB ------------------------
@Override
public CompletableFuture<List<String>> pubsubChannelsAsync(@Nullable String pattern){
throw new UnsupportedOperationException("Not supported yet.");
public CompletableFuture<List<String>> pubsubChannelsAsync(@Nullable String pattern) {
Predicate<String> predicate = Pattern.compile(pattern).asPredicate();
return CompletableFuture.completedFuture(pubsubListeners.keySet().stream().filter(predicate).collect(Collectors.toList()));
}
@Override
public CompletableFuture<Void> subscribeAsync(CacheEventListener<byte[]> consumer, String... topics) {
Objects.requireNonNull(consumer);
throw new UnsupportedOperationException("Not supported yet.");
public CompletableFuture<Void> subscribeAsync(CacheEventListener<byte[]> listener, String... topics) {
Objects.requireNonNull(listener);
if (topics == null || topics.length < 1) {
throw new RedkaleException("topics is empty");
}
for (String topic : topics) {
pubsubListeners.computeIfAbsent(topic, t -> new CopyOnWriteArrayList<>()).add(listener);
}
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Integer> unsubscribeAsync(CacheEventListener listener, String... topics) {
int c = 0;
if (listener == null) {
if (topics == null || topics.length < 1) { //清空所有订阅者
for (List<CacheEventListener<byte[]>> listeners : pubsubListeners.values()) {
c += listeners != null ? listeners.size() : 0;
}
pubsubListeners.clear();
} else {
for (String topic : topics) { //清空指定topic的订阅者
List<CacheEventListener<byte[]>> listeners = pubsubListeners.remove(topic);
c += listeners != null ? listeners.size() : 0;
}
}
} else {
if (topics == null || topics.length < 1) {
for (List<CacheEventListener<byte[]>> listeners : pubsubListeners.values()) {
c += listeners != null && listeners.remove(listener) ? 1 : 0;
}
} else {
for (String topic : topics) {
List<CacheEventListener<byte[]>> listeners = pubsubListeners.get(topic);
c += listeners != null && listeners.remove(listener) ? 1 : 0;
}
}
}
return CompletableFuture.completedFuture(c);
}
@Override
public CompletableFuture<Integer> publishAsync(String topic, byte[] message) {
Objects.requireNonNull(topic);
Objects.requireNonNull(message);
throw new UnsupportedOperationException("Not supported yet.");
List<CacheEventListener<byte[]>> listeners = pubsubListeners.get(topic);
if (listeners == null || listeners.isEmpty()) {
return CompletableFuture.completedFuture(0);
}
listeners.parallelStream().forEach(listener -> listener.onMessage(topic, message));
return CompletableFuture.completedFuture(1);
}
//------------------------ 字符串 String ------------------------

View File

@@ -58,6 +58,10 @@ public interface CacheSource extends Resourcable {
subscribeAsync(listener, topics).join();
}
default void unsubscribe(CacheEventListener listener, String... topics) {
unsubscribeAsync(listener, topics).join();
}
default <T> CompletableFuture<Void> subscribeAsync(Type messageType, CacheEventListener<T> listener, String... topics) {
return subscribeAsync(JsonConvert.root(), messageType, listener, topics);
}
@@ -69,6 +73,8 @@ public interface CacheSource extends Resourcable {
public CompletableFuture<Void> subscribeAsync(CacheEventListener<byte[]> listener, String... topics);
public CompletableFuture<Integer> unsubscribeAsync(CacheEventListener listener, String... topics);
//------------------------ 发布 PUB ------------------------
default <T> int publish(String topic, T message) {
return publish(topic, JsonConvert.root(), message.getClass(), message);