diff --git a/src/main/java/org/redkale/cached/spi/CachedAsyncLock.java b/src/main/java/org/redkale/cached/spi/CachedAsyncLock.java
new file mode 100644
index 000000000..39acd4642
--- /dev/null
+++ b/src/main/java/org/redkale/cached/spi/CachedAsyncLock.java
@@ -0,0 +1,91 @@
+/*
+
+*/
+
+package org.redkale.cached.spi;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * 缓存信异步操作锁
+ *
+ *
详情见: https://redkale.org
+ *
+ * @author zhangjx
+ * @since 2.8.0
+ *
+ */
+public class CachedAsyncLock {
+
+ private static final Object NIL = new Object();
+
+ private final ConcurrentHashMap asyncLockMap;
+
+ private final AtomicBoolean state = new AtomicBoolean();
+
+ private final List futures = new ArrayList<>();
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private final String lockId;
+
+ private Object resultObj = NIL;
+
+ private Throwable resultExp;
+
+ public CachedAsyncLock(ConcurrentHashMap asyncLockMap, String lockId) {
+ this.asyncLockMap = asyncLockMap;
+ this.lockId = lockId;
+ }
+
+ public boolean compareAddFuture(CompletableFuture future) {
+ lock.lock();
+ try {
+ if (resultObj != NIL) {
+ future.complete(resultObj);
+ return false;
+ } else if (resultExp != null) {
+ future.completeExceptionally(resultExp);
+ return false;
+ }
+ boolean rs = state.compareAndSet(false, true);
+ this.futures.add(future);
+ return rs;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void fail(Throwable t) {
+ lock.lock();
+ try {
+ this.resultExp = t;
+ for (CompletableFuture future : futures) {
+ future.completeExceptionally(t);
+ }
+ this.futures.clear();
+ } finally {
+ asyncLockMap.remove(lockId);
+ lock.unlock();
+ }
+ }
+
+ public void success(T val) {
+ lock.lock();
+ try {
+ this.resultObj = val;
+ for (CompletableFuture future : futures) {
+ future.complete(val);
+ }
+ this.futures.clear();
+ } finally {
+ asyncLockMap.remove(lockId);
+ lock.unlock();
+ }
+ }
+}
diff --git a/src/main/java/org/redkale/cached/spi/CachedEventMessage.java b/src/main/java/org/redkale/cached/spi/CachedEventMessage.java
new file mode 100644
index 000000000..71685ae27
--- /dev/null
+++ b/src/main/java/org/redkale/cached/spi/CachedEventMessage.java
@@ -0,0 +1,53 @@
+/*
+
+*/
+
+package org.redkale.cached.spi;
+
+import java.io.Serializable;
+import org.redkale.convert.json.JsonConvert;
+
+/**
+ * 缓存推送的消息对象
+ *
+ * 详情见: https://redkale.org
+ *
+ * @author zhangjx
+ * @since 2.8.0
+ *
+ */
+public class CachedEventMessage implements Serializable {
+ // key
+ protected String key;
+
+ // 时间
+ protected long time;
+
+ public CachedEventMessage() {}
+
+ public CachedEventMessage(String key) {
+ this.key = key;
+ this.time = System.currentTimeMillis();
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+
+ @Override
+ public String toString() {
+ return JsonConvert.root().convertTo(this);
+ }
+}
diff --git a/src/main/java/org/redkale/cached/spi/CachedManagerService.java b/src/main/java/org/redkale/cached/spi/CachedManagerService.java
index 947ef0ed5..778abcb0a 100644
--- a/src/main/java/org/redkale/cached/spi/CachedManagerService.java
+++ b/src/main/java/org/redkale/cached/spi/CachedManagerService.java
@@ -3,17 +3,12 @@
*/
package org.redkale.cached.spi;
-import java.io.Serializable;
import java.lang.reflect.Type;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component;
@@ -22,7 +17,6 @@ import org.redkale.annotation.Resource;
import org.redkale.annotation.ResourceType;
import org.redkale.boot.Application;
import org.redkale.cached.CachedManager;
-import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Local;
import org.redkale.service.Service;
import org.redkale.source.CacheEventListener;
@@ -71,7 +65,7 @@ public class CachedManagerService implements CachedManager, Service {
private final ConcurrentHashMap syncLock = new ConcurrentHashMap<>();
// 缓存无效时使用的异步锁
- private final ConcurrentHashMap asyncLock = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap asyncLockMap = new ConcurrentHashMap<>();
@Resource(required = false)
protected Application application;
@@ -119,7 +113,7 @@ public class CachedManagerService implements CachedManager, Service {
}
if (remoteSource != null) {
this.remoteListener = new CacheRemoteListener();
- this.remoteSource.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
+ this.remoteSource.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
}
}
}
@@ -148,7 +142,7 @@ public class CachedManagerService implements CachedManager, Service {
if (this.broadcastable != broadcastable && remote != null) {
if (broadcastable) {
this.remoteListener = new CacheRemoteListener();
- remote.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
+ remote.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
} else {
if (this.remoteListener != null) {
remote.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC);
@@ -581,7 +575,7 @@ public class CachedManagerService implements CachedManager, Service {
setCache(remoteSource, hash, key, type, value, remoteExpire);
}
if (remoteSource != null && broadcastable) {
- remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key)));
+ remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(hash, key)));
}
}
@@ -610,7 +604,7 @@ public class CachedManagerService implements CachedManager, Service {
}
if (remoteSource != null && broadcastable) {
future = future.thenCompose(r -> remoteSource
- .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key)))
+ .publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(hash, key)))
.thenApply(n -> r));
}
return future;
@@ -631,7 +625,7 @@ public class CachedManagerService implements CachedManager, Service {
if (remoteSource != null) {
v = remoteSource.del(id);
if (broadcastable) {
- remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id));
+ remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id));
}
}
return v;
@@ -653,7 +647,7 @@ public class CachedManagerService implements CachedManager, Service {
return remoteSource.delAsync(id).thenCompose(r -> {
return broadcastable
? remoteSource
- .publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id))
+ .publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id))
.thenApply(n -> r)
: CompletableFuture.completedFuture(v);
});
@@ -753,24 +747,24 @@ public class CachedManagerService implements CachedManager, Service {
if (CachedValue.isValid(val)) {
return CompletableFuture.completedFuture(val.getVal());
}
- final CacheAsyncEntry entry = asyncLock.computeIfAbsent(id, CacheAsyncEntry::new);
+ final CachedAsyncLock lock = asyncLockMap.computeIfAbsent(id, k -> new CachedAsyncLock(asyncLockMap, k));
CompletableFuture future = new CompletableFuture<>();
- if (entry.compareAddFuture(future)) {
+ if (lock.compareAddFuture(future)) {
try {
supplier.get().whenComplete((v, e) -> {
if (e != null) {
- entry.fail(e);
+ lock.fail(e);
}
CachedValue cacheVal = toCacheValue(nullable, v);
if (CachedValue.isValid(cacheVal)) {
setter.set(id, expire, cacheType, cacheVal)
- .whenComplete((v2, e2) -> entry.success(CachedValue.get(cacheVal)));
+ .whenComplete((v2, e2) -> lock.success(CachedValue.get(cacheVal)));
} else {
- entry.success(CachedValue.get(cacheVal));
+ lock.success(CachedValue.get(cacheVal));
}
});
} catch (Throwable e) {
- entry.fail(e);
+ lock.fail(e);
}
}
return future;
@@ -958,8 +952,6 @@ public class CachedManagerService implements CachedManager, Service {
type, t -> TypeToken.createParameterizedType(null, CachedValue.class, type));
}
- private static final Object NIL = new Object();
-
protected static interface GetterFunc {
public R get(String id, Duration expire, Type cacheType);
@@ -975,112 +967,11 @@ public class CachedManagerService implements CachedManager, Service {
public CompletableFuture set(String id, Duration expire, Type cacheType, CachedValue cacheVal);
}
- protected class CacheAsyncEntry {
-
- private final AtomicBoolean state = new AtomicBoolean();
-
- private final List futures = new ArrayList<>();
-
- private final ReentrantLock lock = new ReentrantLock();
-
- private final String lockId;
-
- private Object resultObj = NIL;
-
- private Throwable resultExp;
-
- public CacheAsyncEntry(String lockId) {
- this.lockId = lockId;
- }
-
- public boolean compareAddFuture(CompletableFuture future) {
- lock.lock();
- try {
- if (resultObj != NIL) {
- future.complete(resultObj);
- return false;
- } else if (resultExp != null) {
- future.completeExceptionally(resultExp);
- return false;
- }
- boolean rs = state.compareAndSet(false, true);
- this.futures.add(future);
- return rs;
- } finally {
- lock.unlock();
- }
- }
-
- public void fail(Throwable t) {
- lock.lock();
- try {
- this.resultExp = t;
- for (CompletableFuture future : futures) {
- future.completeExceptionally(t);
- }
- this.futures.clear();
- } finally {
- asyncLock.remove(lockId);
- lock.unlock();
- }
- }
-
- public void success(T val) {
- lock.lock();
- try {
- this.resultObj = val;
- for (CompletableFuture future : futures) {
- future.complete(val);
- }
- this.futures.clear();
- } finally {
- asyncLock.remove(lockId);
- lock.unlock();
- }
- }
- }
-
- public class CacheRemoteListener implements CacheEventListener {
+ public class CacheRemoteListener implements CacheEventListener {
@Override
- public void onMessage(String topic, CacheEventMessage message) {
+ public void onMessage(String topic, CachedEventMessage message) {
localSource.del(message.getKey());
}
}
-
- public static class CacheEventMessage implements Serializable {
- // key
- protected String key;
-
- // 时间
- protected long time;
-
- public CacheEventMessage() {}
-
- public CacheEventMessage(String key) {
- this.key = key;
- this.time = System.currentTimeMillis();
- }
-
- public String getKey() {
- return key;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
-
- public long getTime() {
- return time;
- }
-
- public void setTime(long time) {
- this.time = time;
- }
-
- @Override
- public String toString() {
- return JsonConvert.root().convertTo(this);
- }
- }
}
diff --git a/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java b/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java
index 3d784d9a9..bf5823c70 100644
--- a/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java
+++ b/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java
@@ -33,7 +33,9 @@ import org.redkale.util.RedkaleException;
* @since 2.8.0
*/
public class CachedModuleEngine extends ModuleEngine {
+
protected static final String CONFIG_NAME = "cached";
+
// 全局缓存管理器
private CachedManager cacheManager;
diff --git a/src/test/java/org/redkale/test/cached/CachedInstance.java b/src/test/java/org/redkale/test/cached/CachedInstance.java
index 9ee392220..9ed6650ba 100644
--- a/src/test/java/org/redkale/test/cached/CachedInstance.java
+++ b/src/test/java/org/redkale/test/cached/CachedInstance.java
@@ -49,6 +49,12 @@ public class CachedInstance implements Service {
return "haha";
}
+ @Cached(key = "dictcode", localExpire = "30", remoteExpire = "60")
+ public CompletableFuture getDictcodeAsync() {
+ System.out.println("执行了 getDictcodeAsync");
+ return CompletableFuture.completedFuture("code001");
+ }
+
@Cached(key = "name", localExpire = "30")
public CompletableFuture getNameAsync() {
return CompletableFuture.completedFuture("nameAsync");
diff --git a/src/test/java/org/redkale/test/cached/CachedInstanceTest.java b/src/test/java/org/redkale/test/cached/CachedInstanceTest.java
index a129be96b..d8571f48e 100644
--- a/src/test/java/org/redkale/test/cached/CachedInstanceTest.java
+++ b/src/test/java/org/redkale/test/cached/CachedInstanceTest.java
@@ -4,6 +4,7 @@
package org.redkale.test.cached;
import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -71,6 +72,7 @@ public class CachedInstanceTest {
CachedInstance instance2 = Sncp.createLocalService(
null, "", serviceClass, boost2, resourceFactory2, grous, client, null, null, null);
resourceFactory2.inject(instance2);
+
System.out.println(instance.getName2());
System.out.println(instance.getClass());
Assertions.assertEquals("haha", instance.getName2());
@@ -89,8 +91,47 @@ public class CachedInstanceTest {
Utility.sleep(10);
Assertions.assertEquals("gege", instance.getName2());
Assertions.assertEquals("gege", instance2.getName2());
+ System.out.println("=====================================01============================================");
}
@Test
- public void run2() throws Exception {}
+ public void run2() throws Exception {
+ Class serviceClass = CachedInstance.class;
+ CachedAsmMethodBoost boost = new CachedAsmMethodBoost(false, serviceClass);
+ CachedAsmMethodBoost boost2 = new CachedAsmMethodBoost(false, serviceClass);
+ SncpRpcGroups grous = new SncpRpcGroups();
+ AsyncGroup iGroup = AsyncGroup.create("", Utility.newScheduledExecutor(1), 0, 0);
+ SncpClient client = new SncpClient(
+ "", iGroup, "0", new InetSocketAddress("127.0.0.1", 8080), new ClientAddress(), "TCP", 1, 16);
+ CachedInstance instance = Sncp.createLocalService(
+ null, "", serviceClass, boost, resourceFactory, grous, client, null, null, null);
+ resourceFactory.inject(instance);
+ CachedInstance instance2 = Sncp.createLocalService(
+ null, "", serviceClass, boost2, resourceFactory2, grous, client, null, null, null);
+ resourceFactory2.inject(instance2);
+
+ int threads = Runtime.getRuntime().availableProcessors() * 10;
+ CountDownLatch cdl = new CountDownLatch(threads);
+ CountDownLatch cd2 = new CountDownLatch(threads);
+ System.out.println("开启并发数: " + threads);
+ for (int i = 0; i < threads; i++) {
+ new Thread(() -> {
+ cdl.countDown();
+ try {
+ cdl.await();
+ } catch (Exception e) {
+ }
+ instance.getDictcodeAsync().thenApply(v -> {
+ if (!"code001".equals(v)) {
+ System.out.println("值不对: " + v);
+ }
+ return v;
+ });
+ cd2.countDown();
+ })
+ .start();
+ }
+ cd2.await();
+ System.out.println("=====================================02============================================");
+ }
}