diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 377cd112d..02ca9bf23 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -66,6 +66,10 @@ public final class CacheMemorySource extends AbstractCacheSource { private final Map>> pubsubListeners = new ConcurrentHashMap<>(); + private ExecutorService pubsubExecutor; + + private final ReentrantLock pubsubExecutorLock = new ReentrantLock(); + public CacheMemorySource(String resourceName) { this.name = resourceName; } @@ -80,6 +84,32 @@ public final class CacheMemorySource extends AbstractCacheSource { public void onResourceChange(ResourceEvent[] events) { } + private ExecutorService pubsubExecutor() { + ExecutorService executor = pubsubExecutor; + if (executor != null) { + return executor; + } + pubsubExecutorLock.lock(); + try { + if (pubsubExecutor == null) { + String threadNameFormat = "CacheSource-" + resourceName() + "-PubSubThread-%s"; + Function func = Utility.virtualExecutorFunction(); + final AtomicInteger counter = new AtomicInteger(); + pubsubExecutor = func == null ? Executors.newFixedThreadPool(Utility.cpus(), r -> { + Thread t = new Thread(r); + t.setDaemon(true); + int c = counter.incrementAndGet(); + t.setName(String.format(threadNameFormat, "Virtual-" + (c < 10 ? ("00" + c) : (c < 100 ? ("0" + c) : c)))); + return t; + }) : func.apply(threadNameFormat); + } + executor = pubsubExecutor; + } finally { + pubsubExecutorLock.unlock(); + } + return executor; + } + public static boolean acceptsConf(AnyValue config) { String nodes = config.getValue(CACHE_SOURCE_NODES); return nodes != null && nodes.startsWith("memory:"); @@ -156,6 +186,9 @@ public final class CacheMemorySource extends AbstractCacheSource { if (scheduler != null) { scheduler.shutdownNow(); } + if (pubsubExecutor != null) { + pubsubExecutor.shutdownNow(); + } } @Override @@ -213,14 +246,21 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public CompletableFuture publishAsync(String topic, byte[] message) { + public CompletableFuture publishAsync(final String topic, final byte[] message) { Objects.requireNonNull(topic); Objects.requireNonNull(message); List> listeners = pubsubListeners.get(topic); if (listeners == null || listeners.isEmpty()) { return CompletableFuture.completedFuture(0); } - listeners.parallelStream().forEach(listener -> listener.onMessage(topic, message)); + Executor executor = pubsubExecutor(); + listeners.forEach(listener -> executor.execute(() -> { + try { + listener.onMessage(topic, message); + } catch (Throwable t) { + logger.log(Level.SEVERE, "CacheSource subscribe message error, topic: " + topic, t); + } + })); return CompletableFuture.completedFuture(1); }