CacheMemorySource优化
This commit is contained in:
@@ -66,6 +66,10 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
||||
|
||||
private final Map<String, List<CacheEventListener<byte[]>>> pubsubListeners = new ConcurrentHashMap<>();
|
||||
|
||||
private ExecutorService pubsubExecutor;
|
||||
|
||||
private final ReentrantLock pubsubExecutorLock = new ReentrantLock();
|
||||
|
||||
public CacheMemorySource(String resourceName) {
|
||||
this.name = resourceName;
|
||||
}
|
||||
@@ -80,6 +84,32 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
||||
public void onResourceChange(ResourceEvent[] events) {
|
||||
}
|
||||
|
||||
private ExecutorService pubsubExecutor() {
|
||||
ExecutorService executor = pubsubExecutor;
|
||||
if (executor != null) {
|
||||
return executor;
|
||||
}
|
||||
pubsubExecutorLock.lock();
|
||||
try {
|
||||
if (pubsubExecutor == null) {
|
||||
String threadNameFormat = "CacheSource-" + resourceName() + "-PubSubThread-%s";
|
||||
Function<String, ExecutorService> func = Utility.virtualExecutorFunction();
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
pubsubExecutor = func == null ? Executors.newFixedThreadPool(Utility.cpus(), r -> {
|
||||
Thread t = new Thread(r);
|
||||
t.setDaemon(true);
|
||||
int c = counter.incrementAndGet();
|
||||
t.setName(String.format(threadNameFormat, "Virtual-" + (c < 10 ? ("00" + c) : (c < 100 ? ("0" + c) : c))));
|
||||
return t;
|
||||
}) : func.apply(threadNameFormat);
|
||||
}
|
||||
executor = pubsubExecutor;
|
||||
} finally {
|
||||
pubsubExecutorLock.unlock();
|
||||
}
|
||||
return executor;
|
||||
}
|
||||
|
||||
public static boolean acceptsConf(AnyValue config) {
|
||||
String nodes = config.getValue(CACHE_SOURCE_NODES);
|
||||
return nodes != null && nodes.startsWith("memory:");
|
||||
@@ -156,6 +186,9 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
||||
if (scheduler != null) {
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
if (pubsubExecutor != null) {
|
||||
pubsubExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -213,14 +246,21 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> publishAsync(String topic, byte[] message) {
|
||||
public CompletableFuture<Integer> publishAsync(final String topic, final byte[] message) {
|
||||
Objects.requireNonNull(topic);
|
||||
Objects.requireNonNull(message);
|
||||
List<CacheEventListener<byte[]>> listeners = pubsubListeners.get(topic);
|
||||
if (listeners == null || listeners.isEmpty()) {
|
||||
return CompletableFuture.completedFuture(0);
|
||||
}
|
||||
listeners.parallelStream().forEach(listener -> listener.onMessage(topic, message));
|
||||
Executor executor = pubsubExecutor();
|
||||
listeners.forEach(listener -> executor.execute(() -> {
|
||||
try {
|
||||
listener.onMessage(topic, message);
|
||||
} catch (Throwable t) {
|
||||
logger.log(Level.SEVERE, "CacheSource subscribe message error, topic: " + topic, t);
|
||||
}
|
||||
}));
|
||||
return CompletableFuture.completedFuture(1);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user