CachedAsyncLock

This commit is contained in:
redkale
2024-06-08 12:25:37 +08:00
parent f9339229bc
commit f62b276033
6 changed files with 209 additions and 125 deletions

View File

@@ -0,0 +1,91 @@
/*
*/
package org.redkale.cached.spi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/**
* 缓存信异步操作锁
*
* <p>详情见: https://redkale.org
*
* @author zhangjx
* @since 2.8.0
*
*/
public class CachedAsyncLock {
private static final Object NIL = new Object();
private final ConcurrentHashMap<String, CachedAsyncLock> asyncLockMap;
private final AtomicBoolean state = new AtomicBoolean();
private final List<CompletableFuture> futures = new ArrayList<>();
private final ReentrantLock lock = new ReentrantLock();
private final String lockId;
private Object resultObj = NIL;
private Throwable resultExp;
public CachedAsyncLock(ConcurrentHashMap<String, CachedAsyncLock> asyncLockMap, String lockId) {
this.asyncLockMap = asyncLockMap;
this.lockId = lockId;
}
public boolean compareAddFuture(CompletableFuture future) {
lock.lock();
try {
if (resultObj != NIL) {
future.complete(resultObj);
return false;
} else if (resultExp != null) {
future.completeExceptionally(resultExp);
return false;
}
boolean rs = state.compareAndSet(false, true);
this.futures.add(future);
return rs;
} finally {
lock.unlock();
}
}
public void fail(Throwable t) {
lock.lock();
try {
this.resultExp = t;
for (CompletableFuture future : futures) {
future.completeExceptionally(t);
}
this.futures.clear();
} finally {
asyncLockMap.remove(lockId);
lock.unlock();
}
}
public <T> void success(T val) {
lock.lock();
try {
this.resultObj = val;
for (CompletableFuture future : futures) {
future.complete(val);
}
this.futures.clear();
} finally {
asyncLockMap.remove(lockId);
lock.unlock();
}
}
}

View File

@@ -0,0 +1,53 @@
/*
*/
package org.redkale.cached.spi;
import java.io.Serializable;
import org.redkale.convert.json.JsonConvert;
/**
* 缓存推送的消息对象
*
* <p>详情见: https://redkale.org
*
* @author zhangjx
* @since 2.8.0
*
*/
public class CachedEventMessage implements Serializable {
// key
protected String key;
// 时间
protected long time;
public CachedEventMessage() {}
public CachedEventMessage(String key) {
this.key = key;
this.time = System.currentTimeMillis();
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@@ -3,17 +3,12 @@
*/
package org.redkale.cached.spi;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component;
@@ -22,7 +17,6 @@ import org.redkale.annotation.Resource;
import org.redkale.annotation.ResourceType;
import org.redkale.boot.Application;
import org.redkale.cached.CachedManager;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Local;
import org.redkale.service.Service;
import org.redkale.source.CacheEventListener;
@@ -71,7 +65,7 @@ public class CachedManagerService implements CachedManager, Service {
private final ConcurrentHashMap<String, CachedValue> syncLock = new ConcurrentHashMap<>();
// 缓存无效时使用的异步锁
private final ConcurrentHashMap<String, CacheAsyncEntry> asyncLock = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CachedAsyncLock> asyncLockMap = new ConcurrentHashMap<>();
@Resource(required = false)
protected Application application;
@@ -119,7 +113,7 @@ public class CachedManagerService implements CachedManager, Service {
}
if (remoteSource != null) {
this.remoteListener = new CacheRemoteListener();
this.remoteSource.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
this.remoteSource.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
}
}
}
@@ -148,7 +142,7 @@ public class CachedManagerService implements CachedManager, Service {
if (this.broadcastable != broadcastable && remote != null) {
if (broadcastable) {
this.remoteListener = new CacheRemoteListener();
remote.subscribe(CacheEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
remote.subscribe(CachedEventMessage.class, remoteListener, CACHE_CHANNEL_TOPIC);
} else {
if (this.remoteListener != null) {
remote.unsubscribe(remoteListener, CACHE_CHANNEL_TOPIC);
@@ -581,7 +575,7 @@ public class CachedManagerService implements CachedManager, Service {
setCache(remoteSource, hash, key, type, value, remoteExpire);
}
if (remoteSource != null && broadcastable) {
remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key)));
remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(hash, key)));
}
}
@@ -610,7 +604,7 @@ public class CachedManagerService implements CachedManager, Service {
}
if (remoteSource != null && broadcastable) {
future = future.thenCompose(r -> remoteSource
.publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(idFor(hash, key)))
.publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(idFor(hash, key)))
.thenApply(n -> r));
}
return future;
@@ -631,7 +625,7 @@ public class CachedManagerService implements CachedManager, Service {
if (remoteSource != null) {
v = remoteSource.del(id);
if (broadcastable) {
remoteSource.publish(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id));
remoteSource.publish(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id));
}
}
return v;
@@ -653,7 +647,7 @@ public class CachedManagerService implements CachedManager, Service {
return remoteSource.delAsync(id).thenCompose(r -> {
return broadcastable
? remoteSource
.publishAsync(CACHE_CHANNEL_TOPIC, new CacheEventMessage(id))
.publishAsync(CACHE_CHANNEL_TOPIC, new CachedEventMessage(id))
.thenApply(n -> r)
: CompletableFuture.completedFuture(v);
});
@@ -753,24 +747,24 @@ public class CachedManagerService implements CachedManager, Service {
if (CachedValue.isValid(val)) {
return CompletableFuture.completedFuture(val.getVal());
}
final CacheAsyncEntry entry = asyncLock.computeIfAbsent(id, CacheAsyncEntry::new);
final CachedAsyncLock lock = asyncLockMap.computeIfAbsent(id, k -> new CachedAsyncLock(asyncLockMap, k));
CompletableFuture<T> future = new CompletableFuture<>();
if (entry.compareAddFuture(future)) {
if (lock.compareAddFuture(future)) {
try {
supplier.get().whenComplete((v, e) -> {
if (e != null) {
entry.fail(e);
lock.fail(e);
}
CachedValue<T> cacheVal = toCacheValue(nullable, v);
if (CachedValue.isValid(cacheVal)) {
setter.set(id, expire, cacheType, cacheVal)
.whenComplete((v2, e2) -> entry.success(CachedValue.get(cacheVal)));
.whenComplete((v2, e2) -> lock.success(CachedValue.get(cacheVal)));
} else {
entry.success(CachedValue.get(cacheVal));
lock.success(CachedValue.get(cacheVal));
}
});
} catch (Throwable e) {
entry.fail(e);
lock.fail(e);
}
}
return future;
@@ -958,8 +952,6 @@ public class CachedManagerService implements CachedManager, Service {
type, t -> TypeToken.createParameterizedType(null, CachedValue.class, type));
}
private static final Object NIL = new Object();
protected static interface GetterFunc<R> {
public R get(String id, Duration expire, Type cacheType);
@@ -975,112 +967,11 @@ public class CachedManagerService implements CachedManager, Service {
public CompletableFuture<Void> set(String id, Duration expire, Type cacheType, CachedValue cacheVal);
}
protected class CacheAsyncEntry {
private final AtomicBoolean state = new AtomicBoolean();
private final List<CompletableFuture> futures = new ArrayList<>();
private final ReentrantLock lock = new ReentrantLock();
private final String lockId;
private Object resultObj = NIL;
private Throwable resultExp;
public CacheAsyncEntry(String lockId) {
this.lockId = lockId;
}
public boolean compareAddFuture(CompletableFuture future) {
lock.lock();
try {
if (resultObj != NIL) {
future.complete(resultObj);
return false;
} else if (resultExp != null) {
future.completeExceptionally(resultExp);
return false;
}
boolean rs = state.compareAndSet(false, true);
this.futures.add(future);
return rs;
} finally {
lock.unlock();
}
}
public void fail(Throwable t) {
lock.lock();
try {
this.resultExp = t;
for (CompletableFuture future : futures) {
future.completeExceptionally(t);
}
this.futures.clear();
} finally {
asyncLock.remove(lockId);
lock.unlock();
}
}
public <T> void success(T val) {
lock.lock();
try {
this.resultObj = val;
for (CompletableFuture future : futures) {
future.complete(val);
}
this.futures.clear();
} finally {
asyncLock.remove(lockId);
lock.unlock();
}
}
}
public class CacheRemoteListener implements CacheEventListener<CacheEventMessage> {
public class CacheRemoteListener implements CacheEventListener<CachedEventMessage> {
@Override
public void onMessage(String topic, CacheEventMessage message) {
public void onMessage(String topic, CachedEventMessage message) {
localSource.del(message.getKey());
}
}
public static class CacheEventMessage implements Serializable {
// key
protected String key;
// 时间
protected long time;
public CacheEventMessage() {}
public CacheEventMessage(String key) {
this.key = key;
this.time = System.currentTimeMillis();
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}
}

View File

@@ -33,7 +33,9 @@ import org.redkale.util.RedkaleException;
* @since 2.8.0
*/
public class CachedModuleEngine extends ModuleEngine {
protected static final String CONFIG_NAME = "cached";
// 全局缓存管理器
private CachedManager cacheManager;

View File

@@ -49,6 +49,12 @@ public class CachedInstance implements Service {
return "haha";
}
@Cached(key = "dictcode", localExpire = "30", remoteExpire = "60")
public CompletableFuture<String> getDictcodeAsync() {
System.out.println("执行了 getDictcodeAsync");
return CompletableFuture.completedFuture("code001");
}
@Cached(key = "name", localExpire = "30")
public CompletableFuture<String> getNameAsync() {
return CompletableFuture.completedFuture("nameAsync");

View File

@@ -4,6 +4,7 @@
package org.redkale.test.cached;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -71,6 +72,7 @@ public class CachedInstanceTest {
CachedInstance instance2 = Sncp.createLocalService(
null, "", serviceClass, boost2, resourceFactory2, grous, client, null, null, null);
resourceFactory2.inject(instance2);
System.out.println(instance.getName2());
System.out.println(instance.getClass());
Assertions.assertEquals("haha", instance.getName2());
@@ -89,8 +91,47 @@ public class CachedInstanceTest {
Utility.sleep(10);
Assertions.assertEquals("gege", instance.getName2());
Assertions.assertEquals("gege", instance2.getName2());
System.out.println("=====================================01============================================");
}
@Test
public void run2() throws Exception {}
public void run2() throws Exception {
Class<CachedInstance> serviceClass = CachedInstance.class;
CachedAsmMethodBoost boost = new CachedAsmMethodBoost(false, serviceClass);
CachedAsmMethodBoost boost2 = new CachedAsmMethodBoost(false, serviceClass);
SncpRpcGroups grous = new SncpRpcGroups();
AsyncGroup iGroup = AsyncGroup.create("", Utility.newScheduledExecutor(1), 0, 0);
SncpClient client = new SncpClient(
"", iGroup, "0", new InetSocketAddress("127.0.0.1", 8080), new ClientAddress(), "TCP", 1, 16);
CachedInstance instance = Sncp.createLocalService(
null, "", serviceClass, boost, resourceFactory, grous, client, null, null, null);
resourceFactory.inject(instance);
CachedInstance instance2 = Sncp.createLocalService(
null, "", serviceClass, boost2, resourceFactory2, grous, client, null, null, null);
resourceFactory2.inject(instance2);
int threads = Runtime.getRuntime().availableProcessors() * 10;
CountDownLatch cdl = new CountDownLatch(threads);
CountDownLatch cd2 = new CountDownLatch(threads);
System.out.println("开启并发数: " + threads);
for (int i = 0; i < threads; i++) {
new Thread(() -> {
cdl.countDown();
try {
cdl.await();
} catch (Exception e) {
}
instance.getDictcodeAsync().thenApply(v -> {
if (!"code001".equals(v)) {
System.out.println("值不对: " + v);
}
return v;
});
cd2.countDown();
})
.start();
}
cd2.await();
System.out.println("=====================================02============================================");
}
}