From e365dae9e4402e15d946a797d32ce1d6795336f2 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 4 Jun 2020 14:06:35 +0800 Subject: [PATCH] --- src/org/redkale/mq/MessageAgent.java | 6 +- src/org/redkale/mq/MessageRespFutureNode.java | 10 +- src/org/redkale/mq/SncpRespProcessor.java | 6 +- src/org/redkale/source/CacheMemorySource.java | 161 +++++++++++------- src/org/redkale/source/CacheSource.java | 18 +- 5 files changed, 132 insertions(+), 69 deletions(-) diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index f4d42e833..e5a5fa5d6 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -7,7 +7,7 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import java.util.logging.Logger; import javax.annotation.Resource; import org.redkale.boot.*; @@ -54,8 +54,8 @@ public abstract class MessageAgent { public void init(AnyValue config) { } - public final CompletableFuture createSncpRespFuture(MessageRecord message) { - return this.sncpRespProcessor.createFuture(message.getSeqid()); + public final CompletableFuture createSncpRespFuture(AtomicLong counter, MessageRecord message) { + return this.sncpRespProcessor.createFuture(message.getSeqid(), counter); } public final synchronized void startSncpRespConsumer() { diff --git a/src/org/redkale/mq/MessageRespFutureNode.java b/src/org/redkale/mq/MessageRespFutureNode.java index 622b1faf5..a75845f54 100644 --- a/src/org/redkale/mq/MessageRespFutureNode.java +++ b/src/org/redkale/mq/MessageRespFutureNode.java @@ -6,6 +6,7 @@ package org.redkale.mq; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; /** * MQ管理器 @@ -23,10 +24,13 @@ public class MessageRespFutureNode { protected final long createtime; + protected final AtomicLong counter; + protected final CompletableFuture future; - public MessageRespFutureNode(long seqid, CompletableFuture future) { + public MessageRespFutureNode(long seqid, AtomicLong counter, CompletableFuture future) { this.seqid = seqid; + this.counter = counter; this.future = future; this.createtime = System.currentTimeMillis(); } @@ -39,6 +43,10 @@ public class MessageRespFutureNode { return createtime; } + public AtomicLong getCounter() { + return counter; + } + public CompletableFuture getFuture() { return future; } diff --git a/src/org/redkale/mq/SncpRespProcessor.java b/src/org/redkale/mq/SncpRespProcessor.java index 71f1cbe46..c4d14b142 100644 --- a/src/org/redkale/mq/SncpRespProcessor.java +++ b/src/org/redkale/mq/SncpRespProcessor.java @@ -6,6 +6,7 @@ package org.redkale.mq; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.*; /** @@ -38,12 +39,13 @@ public class SncpRespProcessor implements MessageProcessor { logger.log(Level.WARNING, SncpRespProcessor.class.getSimpleName() + " process " + message + " error"); return; } + if (node.getCounter() != null) node.getCounter().decrementAndGet(); node.future.complete(message); } - public CompletableFuture createFuture(long seqid) { + public CompletableFuture createFuture(long seqid, AtomicLong counter) { CompletableFuture future = new CompletableFuture<>(); - MessageRespFutureNode node = new MessageRespFutureNode(seqid, future); + MessageRespFutureNode node = new MessageRespFutureNode(seqid, counter, future); respNodes.put(seqid, node); return future; } diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index 65a979bf8..414d2ae79 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.*; import java.util.logging.*; import javax.annotation.Resource; +import org.redkale.convert.Convert; import org.redkale.convert.json.*; import org.redkale.net.sncp.*; import org.redkale.service.*; @@ -381,7 +382,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + @SuppressWarnings("unchecked") public V getAndRefresh(String key, final int expireSeconds) { if (key == null) return null; @@ -400,7 +401,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + @SuppressWarnings("unchecked") public String getStringAndRefresh(String key, final int expireSeconds) { if (key == null) return null; @@ -412,7 +413,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public long getLongAndRefresh(String key, final int expireSeconds, long defValue) { if (key == null) return defValue; CacheEntry entry = container.get(key); @@ -424,7 +425,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture getAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds), getExecutor()); } @@ -435,19 +436,19 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture getStringAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getStringAndRefresh(key, expireSeconds), getExecutor()); } @Override - + public CompletableFuture getLongAndRefreshAsync(final String key, final int expireSeconds, long defValue) { return CompletableFuture.supplyAsync(() -> getLongAndRefresh(key, expireSeconds, defValue), getExecutor()); } @Override - + public void refresh(String key, final int expireSeconds) { if (key == null) return; CacheEntry entry = container.get(key); @@ -457,7 +458,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture refreshAsync(final String key, final int expireSeconds) { return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -476,47 +477,65 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void set(String key, V value) { set(CacheEntryType.OBJECT, key, value); } + @Override + public void set(String key, Convert convert, T value) { + set(CacheEntryType.OBJECT, key, value); + } + @Override public void set(String key, Type type, T value) { set(CacheEntryType.OBJECT, key, value); } @Override - + public void set(String key, Convert convert, Type type, T value) { + set(CacheEntryType.OBJECT, key, value); + } + + @Override + public void setString(String key, String value) { set(CacheEntryType.STRING, key, value); } @Override - + public void setLong(String key, long value) { set(CacheEntryType.LONG, key, value); } @Override - public CompletableFuture setAsync(String key, V value) { return CompletableFuture.runAsync(() -> set(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture setAsync(String key, Convert convert, T value) { + return CompletableFuture.runAsync(() -> set(key, convert, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override public CompletableFuture setAsync(String key, Type type, T value) { return CompletableFuture.runAsync(() -> set(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture setAsync(String key, Convert convert, Type type, T value) { + return CompletableFuture.runAsync(() -> set(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + + @Override public CompletableFuture setStringAsync(String key, String value) { return CompletableFuture.runAsync(() -> setString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture setLongAsync(String key, long value) { return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -535,53 +554,71 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void set(int expireSeconds, String key, V value) { set(CacheEntryType.OBJECT, expireSeconds, key, value); } + @Override + public void set(final int expireSeconds, String key, Convert convert, T value) { + set(CacheEntryType.OBJECT, expireSeconds, key, value); + } + @Override public void set(final int expireSeconds, String key, Type type, T value) { set(CacheEntryType.OBJECT, expireSeconds, key, value); } @Override - + public void set(final int expireSeconds, String key, Convert convert, Type type, T value) { + set(CacheEntryType.OBJECT, expireSeconds, key, value); + } + + @Override public void setString(int expireSeconds, String key, String value) { set(CacheEntryType.STRING, expireSeconds, key, value); } @Override - + public void setLong(int expireSeconds, String key, long value) { set(CacheEntryType.LONG, expireSeconds, key, value); } @Override - + public CompletableFuture setAsync(int expireSeconds, String key, V value) { return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture setAsync(int expireSeconds, String key, Convert convert, T value) { + return CompletableFuture.runAsync(() -> set(expireSeconds, key, convert, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override public CompletableFuture setAsync(int expireSeconds, String key, Type type, T value) { return CompletableFuture.runAsync(() -> set(expireSeconds, key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture setAsync(int expireSeconds, String key, Convert convert, Type type, T value) { + return CompletableFuture.runAsync(() -> set(expireSeconds, key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + + @Override public CompletableFuture setStringAsync(int expireSeconds, String key, String value) { return CompletableFuture.runAsync(() -> setString(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture setLongAsync(int expireSeconds, String key, long value) { return CompletableFuture.runAsync(() -> setLong(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public void setExpireSeconds(String key, int expireSeconds) { if (key == null) return; CacheEntry entry = container.get(key); @@ -590,32 +627,32 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture setExpireSecondsAsync(final String key, final int expireSeconds) { return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public void remove(String key) { if (key == null) return; container.remove(key); } @Override - + public long incr(final String key) { return incr(key, 1); } @Override - + public CompletableFuture incrAsync(final String key) { return CompletableFuture.supplyAsync(() -> incr(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public long incr(final String key, long num) { CacheEntry entry = container.get(key); if (entry == null) { @@ -631,37 +668,37 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture incrAsync(final String key, long num) { return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public long decr(final String key) { return incr(key, -1); } @Override - + public CompletableFuture decrAsync(final String key) { return CompletableFuture.supplyAsync(() -> decr(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public long decr(final String key, long num) { return incr(key, -num); } @Override - + public CompletableFuture decrAsync(final String key, long num) { return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture removeAsync(final String key) { return CompletableFuture.runAsync(() -> remove(key), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -763,7 +800,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public Collection getCollectionAndRefresh(final String key, final int expireSeconds) { return (Collection) getAndRefresh(key, expireSeconds); } @@ -774,7 +811,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public Collection getStringCollectionAndRefresh(final String key, final int expireSeconds) { return (Collection) getAndRefresh(key, expireSeconds); } @@ -824,13 +861,13 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public Collection getLongCollectionAndRefresh(final String key, final int expireSeconds) { return (Collection) getAndRefresh(key, expireSeconds); } @Override - + public CompletableFuture> getCollectionAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds), getExecutor()); } @@ -841,13 +878,13 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getStringCollectionAndRefresh(key, expireSeconds), getExecutor()); } @Override - + public CompletableFuture> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds) { return CompletableFuture.supplyAsync(() -> getLongCollectionAndRefresh(key, expireSeconds), getExecutor()); } @@ -867,7 +904,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void appendListItem(String key, V value) { appendListItem(CacheEntryType.OBJECT_LIST, key, value); } @@ -878,19 +915,19 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void appendStringListItem(String key, String value) { appendListItem(CacheEntryType.STRING_LIST, key, value); } @Override - + public void appendLongListItem(String key, long value) { appendListItem(CacheEntryType.LONG_LIST, key, value); } @Override - + public CompletableFuture appendListItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -901,19 +938,19 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture appendStringListItemAsync(final String key, final String value) { return CompletableFuture.runAsync(() -> appendStringListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture appendLongListItemAsync(final String key, final long value) { return CompletableFuture.runAsync(() -> appendLongListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public void removeListItem(String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); @@ -930,7 +967,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void removeStringListItem(String key, String value) { if (key == null) return; CacheEntry entry = container.get(key); @@ -939,7 +976,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void removeLongListItem(String key, long value) { if (key == null) return; CacheEntry entry = container.get(key); @@ -948,7 +985,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture removeListItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -959,13 +996,13 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture removeStringListItemAsync(final String key, final String value) { return CompletableFuture.runAsync(() -> removeStringListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture removeLongListItemAsync(final String key, final long value) { return CompletableFuture.runAsync(() -> removeLongListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -985,7 +1022,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void appendSetItem(String key, V value) { appendSetItem(CacheEntryType.OBJECT_SET, key, value); } @@ -996,19 +1033,19 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void appendStringSetItem(String key, String value) { appendSetItem(CacheEntryType.OBJECT_SET, key, value); } @Override - + public void appendLongSetItem(String key, long value) { appendSetItem(CacheEntryType.OBJECT_SET, key, value); } @Override - + public CompletableFuture appendSetItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -1019,19 +1056,19 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture appendStringSetItemAsync(final String key, final String value) { return CompletableFuture.runAsync(() -> appendStringSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture appendLongSetItemAsync(final String key, final long value) { return CompletableFuture.runAsync(() -> appendLongSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public void removeSetItem(String key, V value) { if (key == null) return; CacheEntry entry = container.get(key); @@ -1048,7 +1085,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void removeStringSetItem(String key, String value) { if (key == null) return; CacheEntry entry = container.get(key); @@ -1057,7 +1094,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public void removeLongSetItem(String key, long value) { if (key == null) return; CacheEntry entry = container.get(key); @@ -1066,7 +1103,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture removeSetItemAsync(final String key, final V value) { return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -1077,13 +1114,13 @@ public class CacheMemorySource extends AbstractService impleme } @Override - + public CompletableFuture removeStringSetItemAsync(final String key, final String value) { return CompletableFuture.runAsync(() -> removeStringSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override - + public CompletableFuture removeLongSetItemAsync(final String key, final long value) { return CompletableFuture.runAsync(() -> removeLongSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index c462a186d..ce99421b2 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -9,7 +9,7 @@ import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.*; import java.util.function.Function; -import org.redkale.convert.ConvertColumn; +import org.redkale.convert.*; import org.redkale.convert.json.JsonFactory; import org.redkale.util.ConstructorParameters; @@ -70,12 +70,20 @@ public interface CacheSource { public void set(final String key, final V value); + public void set(final String key, final Convert convert, final T value); + public void set(final String key, final Type type, final T value); + public void set(final String key, final Convert convert, final Type type, final T value); + public void set(final int expireSeconds, final String key, final V value); + public void set(final int expireSeconds, final String key, final Convert convert, final T value); + public void set(final int expireSeconds, final String key, final Type type, final T value); + public void set(final int expireSeconds, final String key, final Convert convert, final Type type, final T value); + public void setExpireSeconds(final String key, final int expireSeconds); public void remove(final String key); @@ -219,12 +227,20 @@ public interface CacheSource { public CompletableFuture setAsync(final String key, final V value); + public CompletableFuture setAsync(final String key, final Convert convert, final T value); + public CompletableFuture setAsync(final String key, final Type type, final T value); + public CompletableFuture setAsync(final String key, final Convert convert, final Type type, final T value); + public CompletableFuture setAsync(final int expireSeconds, final String key, final V value); + public CompletableFuture setAsync(final int expireSeconds, final String key, final Convert convert, final T value); + public CompletableFuture setAsync(final int expireSeconds, final String key, final Type type, final T value); + public CompletableFuture setAsync(final int expireSeconds, final String key, final Convert convert, final Type type, final T value); + public CompletableFuture setExpireSecondsAsync(final String key, final int expireSeconds); public CompletableFuture removeAsync(final String key);