This commit is contained in:
Redkale
2020-06-04 14:06:35 +08:00
parent b6d5fc02dc
commit e365dae9e4
5 changed files with 132 additions and 69 deletions

View File

@@ -7,7 +7,7 @@ package org.redkale.mq;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.*;
import java.util.logging.Logger;
import javax.annotation.Resource;
import org.redkale.boot.*;
@@ -54,8 +54,8 @@ public abstract class MessageAgent {
public void init(AnyValue config) {
}
public final CompletableFuture<MessageRecord> createSncpRespFuture(MessageRecord message) {
return this.sncpRespProcessor.createFuture(message.getSeqid());
public final CompletableFuture<MessageRecord> createSncpRespFuture(AtomicLong counter, MessageRecord message) {
return this.sncpRespProcessor.createFuture(message.getSeqid(), counter);
}
public final synchronized void startSncpRespConsumer() {

View File

@@ -6,6 +6,7 @@
package org.redkale.mq;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
/**
* MQ管理器
@@ -23,10 +24,13 @@ public class MessageRespFutureNode {
protected final long createtime;
protected final AtomicLong counter;
protected final CompletableFuture<MessageRecord> future;
public MessageRespFutureNode(long seqid, CompletableFuture<MessageRecord> future) {
public MessageRespFutureNode(long seqid, AtomicLong counter, CompletableFuture<MessageRecord> future) {
this.seqid = seqid;
this.counter = counter;
this.future = future;
this.createtime = System.currentTimeMillis();
}
@@ -39,6 +43,10 @@ public class MessageRespFutureNode {
return createtime;
}
public AtomicLong getCounter() {
return counter;
}
public CompletableFuture<MessageRecord> getFuture() {
return future;
}

View File

@@ -6,6 +6,7 @@
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.*;
/**
@@ -38,12 +39,13 @@ public class SncpRespProcessor implements MessageProcessor {
logger.log(Level.WARNING, SncpRespProcessor.class.getSimpleName() + " process " + message + " error");
return;
}
if (node.getCounter() != null) node.getCounter().decrementAndGet();
node.future.complete(message);
}
public CompletableFuture<MessageRecord> createFuture(long seqid) {
public CompletableFuture<MessageRecord> createFuture(long seqid, AtomicLong counter) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
MessageRespFutureNode node = new MessageRespFutureNode(seqid, future);
MessageRespFutureNode node = new MessageRespFutureNode(seqid, counter, future);
respNodes.put(seqid, node);
return future;
}

View File

@@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import java.util.logging.*;
import javax.annotation.Resource;
import org.redkale.convert.Convert;
import org.redkale.convert.json.*;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
@@ -381,7 +382,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
@SuppressWarnings("unchecked")
public V getAndRefresh(String key, final int expireSeconds) {
if (key == null) return null;
@@ -400,7 +401,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
@SuppressWarnings("unchecked")
public String getStringAndRefresh(String key, final int expireSeconds) {
if (key == null) return null;
@@ -412,7 +413,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public long getLongAndRefresh(String key, final int expireSeconds, long defValue) {
if (key == null) return defValue;
CacheEntry entry = container.get(key);
@@ -424,7 +425,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<V> getAndRefreshAsync(final String key, final int expireSeconds) {
return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds), getExecutor());
}
@@ -435,19 +436,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<String> getStringAndRefreshAsync(final String key, final int expireSeconds) {
return CompletableFuture.supplyAsync(() -> getStringAndRefresh(key, expireSeconds), getExecutor());
}
@Override
public CompletableFuture<Long> getLongAndRefreshAsync(final String key, final int expireSeconds, long defValue) {
return CompletableFuture.supplyAsync(() -> getLongAndRefresh(key, expireSeconds, defValue), getExecutor());
}
@Override
public void refresh(String key, final int expireSeconds) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -457,7 +458,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Void> refreshAsync(final String key, final int expireSeconds) {
return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -476,47 +477,65 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void set(String key, V value) {
set(CacheEntryType.OBJECT, key, value);
}
@Override
public <T> void set(String key, Convert convert, T value) {
set(CacheEntryType.OBJECT, key, value);
}
@Override
public <T> void set(String key, Type type, T value) {
set(CacheEntryType.OBJECT, key, value);
}
@Override
public <T> void set(String key, Convert convert, Type type, T value) {
set(CacheEntryType.OBJECT, key, value);
}
@Override
public void setString(String key, String value) {
set(CacheEntryType.STRING, key, value);
}
@Override
public void setLong(String key, long value) {
set(CacheEntryType.LONG, key, value);
}
@Override
public CompletableFuture<Void> setAsync(String key, V value) {
return CompletableFuture.runAsync(() -> set(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> setAsync(String key, Convert convert, T value) {
return CompletableFuture.runAsync(() -> set(key, convert, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> setAsync(String key, Type type, T value) {
return CompletableFuture.runAsync(() -> set(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> setAsync(String key, Convert convert, Type type, T value) {
return CompletableFuture.runAsync(() -> set(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> setStringAsync(String key, String value) {
return CompletableFuture.runAsync(() -> setString(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> setLongAsync(String key, long value) {
return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -535,53 +554,71 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void set(int expireSeconds, String key, V value) {
set(CacheEntryType.OBJECT, expireSeconds, key, value);
}
@Override
public <T> void set(final int expireSeconds, String key, Convert convert, T value) {
set(CacheEntryType.OBJECT, expireSeconds, key, value);
}
@Override
public <T> void set(final int expireSeconds, String key, Type type, T value) {
set(CacheEntryType.OBJECT, expireSeconds, key, value);
}
@Override
public <T> void set(final int expireSeconds, String key, Convert convert, Type type, T value) {
set(CacheEntryType.OBJECT, expireSeconds, key, value);
}
@Override
public void setString(int expireSeconds, String key, String value) {
set(CacheEntryType.STRING, expireSeconds, key, value);
}
@Override
public void setLong(int expireSeconds, String key, long value) {
set(CacheEntryType.LONG, expireSeconds, key, value);
}
@Override
public CompletableFuture<Void> setAsync(int expireSeconds, String key, V value) {
return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> setAsync(int expireSeconds, String key, Convert convert, T value) {
return CompletableFuture.runAsync(() -> set(expireSeconds, key, convert, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> setAsync(int expireSeconds, String key, Type type, T value) {
return CompletableFuture.runAsync(() -> set(expireSeconds, key, type, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> setAsync(int expireSeconds, String key, Convert convert, Type type, T value) {
return CompletableFuture.runAsync(() -> set(expireSeconds, key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> setStringAsync(int expireSeconds, String key, String value) {
return CompletableFuture.runAsync(() -> setString(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> setLongAsync(int expireSeconds, String key, long value) {
return CompletableFuture.runAsync(() -> setLong(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public void setExpireSeconds(String key, int expireSeconds) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -590,32 +627,32 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Void> setExpireSecondsAsync(final String key, final int expireSeconds) {
return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public void remove(String key) {
if (key == null) return;
container.remove(key);
}
@Override
public long incr(final String key) {
return incr(key, 1);
}
@Override
public CompletableFuture<Long> incrAsync(final String key) {
return CompletableFuture.supplyAsync(() -> incr(key), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public long incr(final String key, long num) {
CacheEntry entry = container.get(key);
if (entry == null) {
@@ -631,37 +668,37 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Long> incrAsync(final String key, long num) {
return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public long decr(final String key) {
return incr(key, -1);
}
@Override
public CompletableFuture<Long> decrAsync(final String key) {
return CompletableFuture.supplyAsync(() -> decr(key), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public long decr(final String key, long num) {
return incr(key, -num);
}
@Override
public CompletableFuture<Long> decrAsync(final String key, long num) {
return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> removeAsync(final String key) {
return CompletableFuture.runAsync(() -> remove(key), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -763,7 +800,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public Collection<V> getCollectionAndRefresh(final String key, final int expireSeconds) {
return (Collection<V>) getAndRefresh(key, expireSeconds);
}
@@ -774,7 +811,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public Collection<String> getStringCollectionAndRefresh(final String key, final int expireSeconds) {
return (Collection<String>) getAndRefresh(key, expireSeconds);
}
@@ -824,13 +861,13 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public Collection<Long> getLongCollectionAndRefresh(final String key, final int expireSeconds) {
return (Collection<Long>) getAndRefresh(key, expireSeconds);
}
@Override
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final String key, final int expireSeconds) {
return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds), getExecutor());
}
@@ -841,13 +878,13 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Collection<String>> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds) {
return CompletableFuture.supplyAsync(() -> getStringCollectionAndRefresh(key, expireSeconds), getExecutor());
}
@Override
public CompletableFuture<Collection<Long>> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds) {
return CompletableFuture.supplyAsync(() -> getLongCollectionAndRefresh(key, expireSeconds), getExecutor());
}
@@ -867,7 +904,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void appendListItem(String key, V value) {
appendListItem(CacheEntryType.OBJECT_LIST, key, value);
}
@@ -878,19 +915,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void appendStringListItem(String key, String value) {
appendListItem(CacheEntryType.STRING_LIST, key, value);
}
@Override
public void appendLongListItem(String key, long value) {
appendListItem(CacheEntryType.LONG_LIST, key, value);
}
@Override
public CompletableFuture<Void> appendListItemAsync(final String key, final V value) {
return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -901,19 +938,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Void> appendStringListItemAsync(final String key, final String value) {
return CompletableFuture.runAsync(() -> appendStringListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> appendLongListItemAsync(final String key, final long value) {
return CompletableFuture.runAsync(() -> appendLongListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public void removeListItem(String key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -930,7 +967,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void removeStringListItem(String key, String value) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -939,7 +976,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void removeLongListItem(String key, long value) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -948,7 +985,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Void> removeListItemAsync(final String key, final V value) {
return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -959,13 +996,13 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Void> removeStringListItemAsync(final String key, final String value) {
return CompletableFuture.runAsync(() -> removeStringListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> removeLongListItemAsync(final String key, final long value) {
return CompletableFuture.runAsync(() -> removeLongListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -985,7 +1022,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void appendSetItem(String key, V value) {
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
}
@@ -996,19 +1033,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void appendStringSetItem(String key, String value) {
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
}
@Override
public void appendLongSetItem(String key, long value) {
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
}
@Override
public CompletableFuture<Void> appendSetItemAsync(final String key, final V value) {
return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -1019,19 +1056,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value) {
return CompletableFuture.runAsync(() -> appendStringSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> appendLongSetItemAsync(final String key, final long value) {
return CompletableFuture.runAsync(() -> appendLongSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public void removeSetItem(String key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -1048,7 +1085,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void removeStringSetItem(String key, String value) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -1057,7 +1094,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public void removeLongSetItem(String key, long value) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -1066,7 +1103,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Void> removeSetItemAsync(final String key, final V value) {
return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -1077,13 +1114,13 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Void> removeStringSetItemAsync(final String key, final String value) {
return CompletableFuture.runAsync(() -> removeStringSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<Void> removeLongSetItemAsync(final String key, final long value) {
return CompletableFuture.runAsync(() -> removeLongSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}

View File

@@ -9,7 +9,7 @@ import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonFactory;
import org.redkale.util.ConstructorParameters;
@@ -70,12 +70,20 @@ public interface CacheSource<V extends Object> {
public void set(final String key, final V value);
public <T> void set(final String key, final Convert convert, final T value);
public <T> void set(final String key, final Type type, final T value);
public <T> void set(final String key, final Convert convert, final Type type, final T value);
public void set(final int expireSeconds, final String key, final V value);
public <T> void set(final int expireSeconds, final String key, final Convert convert, final T value);
public <T> void set(final int expireSeconds, final String key, final Type type, final T value);
public <T> void set(final int expireSeconds, final String key, final Convert convert, final Type type, final T value);
public void setExpireSeconds(final String key, final int expireSeconds);
public void remove(final String key);
@@ -219,12 +227,20 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Void> setAsync(final String key, final V value);
public <T> CompletableFuture<Void> setAsync(final String key, final Convert convert, final T value);
public <T> CompletableFuture<Void> setAsync(final String key, final Type type, final T value);
public <T> CompletableFuture<Void> setAsync(final String key, final Convert convert, final Type type, final T value);
public CompletableFuture<Void> setAsync(final int expireSeconds, final String key, final V value);
public <T> CompletableFuture<Void> setAsync(final int expireSeconds, final String key, final Convert convert, final T value);
public <T> CompletableFuture<Void> setAsync(final int expireSeconds, final String key, final Type type, final T value);
public <T> CompletableFuture<Void> setAsync(final int expireSeconds, final String key, final Convert convert, final Type type, final T value);
public CompletableFuture<Void> setExpireSecondsAsync(final String key, final int expireSeconds);
public CompletableFuture<Void> removeAsync(final String key);