From f62b276033f798b68e833364206c3b2ec7a000fd Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 8 Jun 2024 12:25:37 +0800 Subject: [PATCH] CachedAsyncLock --- .../redkale/cached/spi/CachedAsyncLock.java | 91 ++++++++++++ .../cached/spi/CachedEventMessage.java | 53 +++++++ .../cached/spi/CachedManagerService.java | 139 ++---------------- .../cached/spi/CachedModuleEngine.java | 2 + .../redkale/test/cached/CachedInstance.java | 6 + .../test/cached/CachedInstanceTest.java | 43 +++++- 6 files changed, 209 insertions(+), 125 deletions(-) create mode 100644 src/main/java/org/redkale/cached/spi/CachedAsyncLock.java create mode 100644 src/main/java/org/redkale/cached/spi/CachedEventMessage.java diff --git a/src/main/java/org/redkale/cached/spi/CachedAsyncLock.java b/src/main/java/org/redkale/cached/spi/CachedAsyncLock.java new file mode 100644 index 000000000..39acd4642 --- /dev/null +++ b/src/main/java/org/redkale/cached/spi/CachedAsyncLock.java @@ -0,0 +1,91 @@ +/* + +*/ + +package org.redkale.cached.spi; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +/** + * 缓存信异步操作锁 + * + *

详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + * + */ +public class CachedAsyncLock { + + private static final Object NIL = new Object(); + + private final ConcurrentHashMap asyncLockMap; + + private final AtomicBoolean state = new AtomicBoolean(); + + private final List futures = new ArrayList<>(); + + private final ReentrantLock lock = new ReentrantLock(); + + private final String lockId; + + private Object resultObj = NIL; + + private Throwable resultExp; + + public CachedAsyncLock(ConcurrentHashMap asyncLockMap, String lockId) { + this.asyncLockMap = asyncLockMap; + this.lockId = lockId; + } + + public boolean compareAddFuture(CompletableFuture future) { + lock.lock(); + try { + if (resultObj != NIL) { + future.complete(resultObj); + return false; + } else if (resultExp != null) { + future.completeExceptionally(resultExp); + return false; + } + boolean rs = state.compareAndSet(false, true); + this.futures.add(future); + return rs; + } finally { + lock.unlock(); + } + } + + public void fail(Throwable t) { + lock.lock(); + try { + this.resultExp = t; + for (CompletableFuture future : futures) { + future.completeExceptionally(t); + } + this.futures.clear(); + } finally { + asyncLockMap.remove(lockId); + lock.unlock(); + } + } + + public void success(T val) { + lock.lock(); + try { + this.resultObj = val; + for (CompletableFuture future : futures) { + future.complete(val); + } + this.futures.clear(); + } finally { + asyncLockMap.remove(lockId); + lock.unlock(); + } + } +} diff --git a/src/main/java/org/redkale/cached/spi/CachedEventMessage.java b/src/main/java/org/redkale/cached/spi/CachedEventMessage.java new file mode 100644 index 000000000..71685ae27 --- /dev/null +++ b/src/main/java/org/redkale/cached/spi/CachedEventMessage.java @@ -0,0 +1,53 @@ +/* + +*/ + +package org.redkale.cached.spi; + +import java.io.Serializable; +import org.redkale.convert.json.JsonConvert; + +/** + * 缓存推送的消息对象 + * + *

详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + * + */ +public class CachedEventMessage implements Serializable { + // key + protected String key; + + // 时间 + protected long time; + + public CachedEventMessage() {} + + public CachedEventMessage(String key) { + this.key = key; + this.time = System.currentTimeMillis(); + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public long getTime() { + return time; + } + + public void setTime(long time) { + this.time = time; + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } +} diff --git a/src/main/java/org/redkale/cached/spi/CachedManagerService.java b/src/main/java/org/redkale/cached/spi/CachedManagerService.java index 947ef0ed5..778abcb0a 100644 --- a/src/main/java/org/redkale/cached/spi/CachedManagerService.java +++ b/src/main/java/org/redkale/cached/spi/CachedManagerService.java @@ -3,17 +3,12 @@ */ package org.redkale.cached.spi; -import java.io.Serializable; import java.lang.reflect.Type; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.Component; @@ -22,7 +17,6 @@ import org.redkale.annotation.Resource; import org.redkale.annotation.ResourceType; import org.redkale.boot.Application; import org.redkale.cached.CachedManager; -import org.redkale.convert.json.JsonConvert; import org.redkale.service.Local; import org.redkale.service.Service; import org.redkale.source.CacheEventListener; @@ -71,7 +65,7 @@ public class CachedManagerService implements CachedManager, Service { private final ConcurrentHashMap syncLock = new ConcurrentHashMap<>(); // 缓存无效时使用的异步锁 - private final ConcurrentHashMap asyncLock = new ConcurrentHashMap<>(); + private final ConcurrentHashMap asyncLockMap = new ConcurrentHashMap<>(); @Resource(required = false) protected Application application; @@ -119,7 +113,7 @@ public class CachedManagerService implements CachedManager, Service { } if (remoteSource != null) { this.remoteListener = new CacheRemoteListener(); - this.remoteSource.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); + this.remoteSource.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); } } } @@ -148,7 +142,7 @@ public class CachedManagerService implements CachedManager, Service { if (this.broadcastable != broadcastable && remote != null) { if (broadcastable) { this.remoteListener = new CacheRemoteListener(); - remote.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); + remote.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC); } else { if (this.remoteListener != null) { remote.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC); @@ -581,7 +575,7 @@ public class CachedManagerService implements CachedManager, Service { setCache(remoteSource, hash, key, type, value, remoteExpire); } if (remoteSource != null && broadcastable) { - remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))); + remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(hash, key))); } } @@ -610,7 +604,7 @@ public class CachedManagerService implements CachedManager, Service { } if (remoteSource != null && broadcastable) { future = future.thenCompose(r -> remoteSource - .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key))) + .publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(hash, key))) .thenApply(n -> r)); } return future; @@ -631,7 +625,7 @@ public class CachedManagerService implements CachedManager, Service { if (remoteSource != null) { v = remoteSource.del(id); if (broadcastable) { - remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id)); + remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id)); } } return v; @@ -653,7 +647,7 @@ public class CachedManagerService implements CachedManager, Service { return remoteSource.delAsync(id).thenCompose(r -> { return broadcastable ? remoteSource - .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id)) + .publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id)) .thenApply(n -> r) : CompletableFuture.completedFuture(v); }); @@ -753,24 +747,24 @@ public class CachedManagerService implements CachedManager, Service { if (CachedValue.isValid(val)) { return CompletableFuture.completedFuture(val.getVal()); } - final CacheAsyncEntry entry = asyncLock.computeIfAbsent(id, CacheAsyncEntry::new); + final CachedAsyncLock lock = asyncLockMap.computeIfAbsent(id, k -> new CachedAsyncLock(asyncLockMap, k)); CompletableFuture future = new CompletableFuture<>(); - if (entry.compareAddFuture(future)) { + if (lock.compareAddFuture(future)) { try { supplier.get().whenComplete((v, e) -> { if (e != null) { - entry.fail(e); + lock.fail(e); } CachedValue cacheVal = toCacheValue(nullable, v); if (CachedValue.isValid(cacheVal)) { setter.set(id, expire, cacheType, cacheVal) - .whenComplete((v2, e2) -> entry.success(CachedValue.get(cacheVal))); + .whenComplete((v2, e2) -> lock.success(CachedValue.get(cacheVal))); } else { - entry.success(CachedValue.get(cacheVal)); + lock.success(CachedValue.get(cacheVal)); } }); } catch (Throwable e) { - entry.fail(e); + lock.fail(e); } } return future; @@ -958,8 +952,6 @@ public class CachedManagerService implements CachedManager, Service { type, t -> TypeToken.createParameterizedType(null, CachedValue.class, type)); } - private static final Object NIL = new Object(); - protected static interface GetterFunc { public R get(String id, Duration expire, Type cacheType); @@ -975,112 +967,11 @@ public class CachedManagerService implements CachedManager, Service { public CompletableFuture set(String id, Duration expire, Type cacheType, CachedValue cacheVal); } - protected class CacheAsyncEntry { - - private final AtomicBoolean state = new AtomicBoolean(); - - private final List futures = new ArrayList<>(); - - private final ReentrantLock lock = new ReentrantLock(); - - private final String lockId; - - private Object resultObj = NIL; - - private Throwable resultExp; - - public CacheAsyncEntry(String lockId) { - this.lockId = lockId; - } - - public boolean compareAddFuture(CompletableFuture future) { - lock.lock(); - try { - if (resultObj != NIL) { - future.complete(resultObj); - return false; - } else if (resultExp != null) { - future.completeExceptionally(resultExp); - return false; - } - boolean rs = state.compareAndSet(false, true); - this.futures.add(future); - return rs; - } finally { - lock.unlock(); - } - } - - public void fail(Throwable t) { - lock.lock(); - try { - this.resultExp = t; - for (CompletableFuture future : futures) { - future.completeExceptionally(t); - } - this.futures.clear(); - } finally { - asyncLock.remove(lockId); - lock.unlock(); - } - } - - public void success(T val) { - lock.lock(); - try { - this.resultObj = val; - for (CompletableFuture future : futures) { - future.complete(val); - } - this.futures.clear(); - } finally { - asyncLock.remove(lockId); - lock.unlock(); - } - } - } - - public class CacheRemoteListener implements CacheEventListener { + public class CacheRemoteListener implements CacheEventListener { @Override - public void onMessage(String topic, CacheEventMessage message) { + public void onMessage(String topic, CachedEventMessage message) { localSource.del(message.getKey()); } } - - public static class CacheEventMessage implements Serializable { - // key - protected String key; - - // 时间 - protected long time; - - public CacheEventMessage() {} - - public CacheEventMessage(String key) { - this.key = key; - this.time = System.currentTimeMillis(); - } - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - public long getTime() { - return time; - } - - public void setTime(long time) { - this.time = time; - } - - @Override - public String toString() { - return JsonConvert.root().convertTo(this); - } - } } diff --git a/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java b/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java index 3d784d9a9..bf5823c70 100644 --- a/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java +++ b/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java @@ -33,7 +33,9 @@ import org.redkale.util.RedkaleException; * @since 2.8.0 */ public class CachedModuleEngine extends ModuleEngine { + protected static final String CONFIG_NAME = "cached"; + // 全局缓存管理器 private CachedManager cacheManager; diff --git a/src/test/java/org/redkale/test/cached/CachedInstance.java b/src/test/java/org/redkale/test/cached/CachedInstance.java index 9ee392220..9ed6650ba 100644 --- a/src/test/java/org/redkale/test/cached/CachedInstance.java +++ b/src/test/java/org/redkale/test/cached/CachedInstance.java @@ -49,6 +49,12 @@ public class CachedInstance implements Service { return "haha"; } + @Cached(key = "dictcode", localExpire = "30", remoteExpire = "60") + public CompletableFuture getDictcodeAsync() { + System.out.println("执行了 getDictcodeAsync"); + return CompletableFuture.completedFuture("code001"); + } + @Cached(key = "name", localExpire = "30") public CompletableFuture getNameAsync() { return CompletableFuture.completedFuture("nameAsync"); diff --git a/src/test/java/org/redkale/test/cached/CachedInstanceTest.java b/src/test/java/org/redkale/test/cached/CachedInstanceTest.java index a129be96b..d8571f48e 100644 --- a/src/test/java/org/redkale/test/cached/CachedInstanceTest.java +++ b/src/test/java/org/redkale/test/cached/CachedInstanceTest.java @@ -4,6 +4,7 @@ package org.redkale.test.cached; import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -71,6 +72,7 @@ public class CachedInstanceTest { CachedInstance instance2 = Sncp.createLocalService( null, "", serviceClass, boost2, resourceFactory2, grous, client, null, null, null); resourceFactory2.inject(instance2); + System.out.println(instance.getName2()); System.out.println(instance.getClass()); Assertions.assertEquals("haha", instance.getName2()); @@ -89,8 +91,47 @@ public class CachedInstanceTest { Utility.sleep(10); Assertions.assertEquals("gege", instance.getName2()); Assertions.assertEquals("gege", instance2.getName2()); + System.out.println("=====================================01============================================"); } @Test - public void run2() throws Exception {} + public void run2() throws Exception { + Class serviceClass = CachedInstance.class; + CachedAsmMethodBoost boost = new CachedAsmMethodBoost(false, serviceClass); + CachedAsmMethodBoost boost2 = new CachedAsmMethodBoost(false, serviceClass); + SncpRpcGroups grous = new SncpRpcGroups(); + AsyncGroup iGroup = AsyncGroup.create("", Utility.newScheduledExecutor(1), 0, 0); + SncpClient client = new SncpClient( + "", iGroup, "0", new InetSocketAddress("127.0.0.1", 8080), new ClientAddress(), "TCP", 1, 16); + CachedInstance instance = Sncp.createLocalService( + null, "", serviceClass, boost, resourceFactory, grous, client, null, null, null); + resourceFactory.inject(instance); + CachedInstance instance2 = Sncp.createLocalService( + null, "", serviceClass, boost2, resourceFactory2, grous, client, null, null, null); + resourceFactory2.inject(instance2); + + int threads = Runtime.getRuntime().availableProcessors() * 10; + CountDownLatch cdl = new CountDownLatch(threads); + CountDownLatch cd2 = new CountDownLatch(threads); + System.out.println("开启并发数: " + threads); + for (int i = 0; i < threads; i++) { + new Thread(() -> { + cdl.countDown(); + try { + cdl.await(); + } catch (Exception e) { + } + instance.getDictcodeAsync().thenApply(v -> { + if (!"code001".equals(v)) { + System.out.println("值不对: " + v); + } + return v; + }); + cd2.countDown(); + }) + .start(); + } + cd2.await(); + System.out.println("=====================================02============================================"); + } }