diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index de162a097..63f688d0c 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -67,9 +67,9 @@ public final class CacheMemorySource extends AbstractCacheSource { //key: topic private final Map>> pubsubListeners = new ConcurrentHashMap<>(); - private ExecutorService pubsubExecutor; + private ExecutorService subExecutor; - private final ReentrantLock pubsubExecutorLock = new ReentrantLock(); + private final ReentrantLock subExecutorLock = new ReentrantLock(); public CacheMemorySource(String resourceName) { this.name = resourceName; @@ -85,18 +85,18 @@ public final class CacheMemorySource extends AbstractCacheSource { public void onResourceChange(ResourceEvent[] events) { } - private ExecutorService pubsubExecutor() { - ExecutorService executor = pubsubExecutor; + protected ExecutorService subExecutor() { + ExecutorService executor = subExecutor; if (executor != null) { return executor; } - pubsubExecutorLock.lock(); + subExecutorLock.lock(); try { - if (pubsubExecutor == null) { - String threadNameFormat = "CacheSource-" + resourceName() + "-PubSubThread-%s"; + if (subExecutor == null) { + String threadNameFormat = "CacheSource-" + resourceName() + "-SubThread-%s"; Function func = Utility.virtualExecutorFunction(); final AtomicInteger counter = new AtomicInteger(); - pubsubExecutor = func == null ? Executors.newFixedThreadPool(Utility.cpus(), r -> { + subExecutor = func == null ? Executors.newFixedThreadPool(Utility.cpus(), r -> { Thread t = new Thread(r); t.setDaemon(true); int c = counter.incrementAndGet(); @@ -104,9 +104,9 @@ public final class CacheMemorySource extends AbstractCacheSource { return t; }) : func.apply(threadNameFormat); } - executor = pubsubExecutor; + executor = subExecutor; } finally { - pubsubExecutorLock.unlock(); + subExecutorLock.unlock(); } return executor; } @@ -187,8 +187,8 @@ public final class CacheMemorySource extends AbstractCacheSource { if (scheduler != null) { scheduler.shutdownNow(); } - if (pubsubExecutor != null) { - pubsubExecutor.shutdownNow(); + if (subExecutor != null) { + subExecutor.shutdownNow(); } } @@ -254,7 +254,7 @@ public final class CacheMemorySource extends AbstractCacheSource { if (listeners == null || listeners.isEmpty()) { return CompletableFuture.completedFuture(0); } - Executor executor = pubsubExecutor(); + Executor executor = subExecutor(); listeners.forEach(listener -> executor.execute(() -> { try { listener.onMessage(topic, message);