CacheMemorySource

This commit is contained in:
redkale
2023-09-07 16:40:52 +08:00
parent da41c7f7fa
commit 388505c899

View File

@@ -67,9 +67,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
//key: topic
private final Map<String, Set<CacheEventListener<byte[]>>> pubsubListeners = new ConcurrentHashMap<>();
private ExecutorService pubsubExecutor;
private ExecutorService subExecutor;
private final ReentrantLock pubsubExecutorLock = new ReentrantLock();
private final ReentrantLock subExecutorLock = new ReentrantLock();
public CacheMemorySource(String resourceName) {
this.name = resourceName;
@@ -85,18 +85,18 @@ public final class CacheMemorySource extends AbstractCacheSource {
public void onResourceChange(ResourceEvent[] events) {
}
private ExecutorService pubsubExecutor() {
ExecutorService executor = pubsubExecutor;
protected ExecutorService subExecutor() {
ExecutorService executor = subExecutor;
if (executor != null) {
return executor;
}
pubsubExecutorLock.lock();
subExecutorLock.lock();
try {
if (pubsubExecutor == null) {
String threadNameFormat = "CacheSource-" + resourceName() + "-PubSubThread-%s";
if (subExecutor == null) {
String threadNameFormat = "CacheSource-" + resourceName() + "-SubThread-%s";
Function<String, ExecutorService> func = Utility.virtualExecutorFunction();
final AtomicInteger counter = new AtomicInteger();
pubsubExecutor = func == null ? Executors.newFixedThreadPool(Utility.cpus(), r -> {
subExecutor = func == null ? Executors.newFixedThreadPool(Utility.cpus(), r -> {
Thread t = new Thread(r);
t.setDaemon(true);
int c = counter.incrementAndGet();
@@ -104,9 +104,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
return t;
}) : func.apply(threadNameFormat);
}
executor = pubsubExecutor;
executor = subExecutor;
} finally {
pubsubExecutorLock.unlock();
subExecutorLock.unlock();
}
return executor;
}
@@ -187,8 +187,8 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (scheduler != null) {
scheduler.shutdownNow();
}
if (pubsubExecutor != null) {
pubsubExecutor.shutdownNow();
if (subExecutor != null) {
subExecutor.shutdownNow();
}
}
@@ -254,7 +254,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
if (listeners == null || listeners.isEmpty()) {
return CompletableFuture.completedFuture(0);
}
Executor executor = pubsubExecutor();
Executor executor = subExecutor();
listeners.forEach(listener -> executor.execute(() -> {
try {
listener.onMessage(topic, message);