diff --git a/src/org/redkale/convert/CollectionDecoder.java b/src/org/redkale/convert/CollectionDecoder.java index 34e78a927..23e3a5c5a 100644 --- a/src/org/redkale/convert/CollectionDecoder.java +++ b/src/org/redkale/convert/CollectionDecoder.java @@ -44,6 +44,11 @@ public final class CollectionDecoder implements Decodeable= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0; - final AsyncConnection conn = transport.pollConnection(addr); - if (conn == null || !conn.isOpen()) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") cannot connect " + (conn == null ? addr : conn.getRemoteAddress())); - throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect"); + final CompletableFuture future = new CompletableFuture(); + AsyncConnection conn0; + try { + conn0 = transport.pollConnection(addr); + } catch (Exception e) { + future.completeExceptionally(e); + return future; } + if (conn0 == null || !conn0.isOpen()) { + future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect")); + return future; + } + final AsyncConnection conn = conn0; final ByteBuffer[] sendBuffers = writer.toBuffers(); fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength); final ByteBuffer buffer = transport.pollBuffer(); - final CompletableFuture future = new CompletableFuture(); conn.write(sendBuffers, sendBuffers, new CompletionHandler() { @Override diff --git a/src/org/redkale/net/sncp/SncpDynServlet.java b/src/org/redkale/net/sncp/SncpDynServlet.java index 6390cafe3..85af9871b 100644 --- a/src/org/redkale/net/sncp/SncpDynServlet.java +++ b/src/org/redkale/net/sncp/SncpDynServlet.java @@ -48,8 +48,8 @@ public final class SncpDynServlet extends SncpServlet { private Supplier bufferSupplier; - public SncpDynServlet(final BsonConvert convert, final String serviceName, final Class type, final Service service) { - super(serviceName, type, service); + public SncpDynServlet(final BsonConvert convert, final String serviceName, final Class serviceOrSourceType, final Service service) { + super(serviceName, serviceOrSourceType, service); this.serviceid = Sncp.hash(type.getName() + ':' + serviceName); Set actionids = new HashSet<>(); for (java.lang.reflect.Method method : service.getClass().getMethods()) { diff --git a/src/org/redkale/net/sncp/SncpServlet.java b/src/org/redkale/net/sncp/SncpServlet.java index 28337fbef..020b6a534 100644 --- a/src/org/redkale/net/sncp/SncpServlet.java +++ b/src/org/redkale/net/sncp/SncpServlet.java @@ -20,14 +20,14 @@ import org.redkale.util.*; */ public abstract class SncpServlet extends Servlet implements Comparable { - protected final Class type; + protected final Class type; protected final String serviceName; protected final Service service; - protected SncpServlet(String serviceName, Class type, Service service) { - this.type = type; + protected SncpServlet(String serviceName, Class serviceOrSourceType, Service service) { + this.type = serviceOrSourceType; this.service = service; this.serviceName = serviceName; } diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index 9a73b3126..a86300877 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -5,7 +5,6 @@ */ package org.redkale.source; -import java.beans.ConstructorProperties; import java.io.*; import java.lang.reflect.Type; import java.util.*; @@ -13,7 +12,6 @@ import java.util.concurrent.*; import java.util.function.Consumer; import java.util.logging.*; import javax.annotation.Resource; -import org.redkale.convert.ConvertColumn; import org.redkale.convert.json.*; import org.redkale.net.sncp.Sncp; import org.redkale.service.*; @@ -56,7 +54,10 @@ public class CacheMemorySource extends private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - protected final ConcurrentHashMap> container = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap> container = new ConcurrentHashMap<>(); + + @RpcRemote + protected CacheSource remoteSource; public CacheMemorySource() { } @@ -125,32 +126,51 @@ public class CacheMemorySource extends boolean datasync = false; //是否从远程同步过数据 //----------同步数据……----------- // TODO - if (!this.needStore) return; - try { - File store = new File(home, "cache/" + resourceName()); - if (!store.isFile() || !store.canRead()) return; - LineNumberReader reader = new LineNumberReader(new FileReader(store)); - if (this.keyType == null) this.keyType = Serializable.class; - if (this.objValueType == null) { - this.objValueType = Object.class; - this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, this.objValueType); - this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, this.objValueType); + if (this.needStore) { + try { + File store = new File(home, "cache/" + resourceName()); + if (!store.isFile() || !store.canRead()) return; + LineNumberReader reader = new LineNumberReader(new FileReader(store)); + if (this.keyType == null) this.keyType = Serializable.class; + if (this.objValueType == null) { + this.objValueType = Object.class; + this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, this.objValueType); + this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, this.objValueType); + } + final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType); + final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType); + final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType); + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) continue; + CacheEntry entry = convert.convertFrom(line.startsWith(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.startsWith(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line); + if (entry.isExpired()) continue; + if (datasync && container.containsKey(entry.key)) continue; //已经同步了 + container.put(entry.key, entry); + } + reader.close(); + store.delete(); + } catch (Exception e) { + logger.log(Level.SEVERE, CacheSource.class.getSimpleName() + "(" + resourceName() + ") load store file error ", e); } - final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType); - final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType); - final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType); - String line; - while ((line = reader.readLine()) != null) { - if (line.isEmpty()) continue; - CacheEntry entry = convert.convertFrom(line.startsWith(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.startsWith(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line); - if (entry.isExpired()) continue; - if (datasync && container.containsKey(entry.key)) continue; //已经同步了 - container.put(entry.key, entry); - } - reader.close(); - store.delete(); - } catch (Exception e) { - logger.log(Level.SEVERE, CacheSource.class.getSimpleName() + "(" + resourceName() + ") load store file error ", e); + } + if (remoteSource != null && !Sncp.isRemote(this)) { + super.runAsync(() -> { + try { + CompletableFuture>> listFuture = remoteSource.queryListAsync(); + listFuture.whenComplete((list, exp) -> { + if (exp != null) { + logger.log(Level.FINEST, CacheSource.class.getSimpleName() + "(" + resourceName() + ") queryListAsync error", exp); + } else { + for (CacheEntry entry : list) { + container.put(entry.key, entry); + } + } + }); + } catch (Exception e) { + logger.log(Level.FINEST, CacheSource.class.getSimpleName() + "(" + resourceName() + ") queryListAsync error, maybe remote node connot connect ", e); + } + }); } } @@ -176,7 +196,7 @@ public class CacheMemorySource extends final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType); final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType); final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType); - Collection> entrys = (Collection>) container.values(); + Collection> entrys = container.values(); for (CacheEntry entry : entrys) { stream.println(convert.convertTo(entry.isSetCacheType() ? storeSetType : (entry.isListCacheType() ? storeListType : storeObjType), entry)); } @@ -204,10 +224,10 @@ public class CacheMemorySource extends public V get(K key) { if (key == null) return null; CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.value == null) return null; - if (entry.isListCacheType()) return (V) new ArrayList((Collection) entry.value); - if (entry.isSetCacheType()) return (V) new HashSet((Collection) entry.value); - return (V) entry.getValue(); + if (entry == null || entry.isExpired()) return null; + if (entry.isListCacheType()) return (V) (entry.listValue == null ? null : new ArrayList(entry.listValue)); + if (entry.isSetCacheType()) return (V) (entry.setValue == null ? null : new HashSet(entry.setValue)); + return (V) entry.objectValue; } @Override @@ -220,12 +240,12 @@ public class CacheMemorySource extends public V getAndRefresh(K key, final int expireSeconds) { if (key == null) return null; CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.value == null) return null; + if (entry == null || entry.isExpired()) return null; entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); entry.expireSeconds = expireSeconds; - if (entry.isListCacheType()) return (V) new ArrayList((Collection) entry.value); - if (entry.isSetCacheType()) return (V) new HashSet((Collection) entry.value); - return (V) entry.getValue(); + if (entry.isListCacheType()) return (V) (entry.listValue == null ? null : new ArrayList(entry.listValue)); + if (entry.isSetCacheType()) return (V) (entry.setValue == null ? null : new HashSet(entry.setValue)); + return (V) entry.objectValue; } @Override @@ -254,11 +274,11 @@ public class CacheMemorySource extends if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) { - entry = new CacheEntry(CacheEntryType.OBJECT, key, value); + entry = new CacheEntry(CacheEntryType.OBJECT, key, value, null, null); container.putIfAbsent(key, entry); } else { entry.expireSeconds = 0; - entry.value = value; + entry.objectValue = value; entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); } } @@ -274,12 +294,12 @@ public class CacheMemorySource extends if (key == null) return; CacheEntry entry = container.get(key); if (entry == null) { - entry = new CacheEntry(CacheEntryType.OBJECT, expireSeconds, key, value); + entry = new CacheEntry(CacheEntryType.OBJECT, expireSeconds, key, value, null, null); container.putIfAbsent(key, entry); } else { if (expireSeconds > 0) entry.expireSeconds = expireSeconds; entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); - entry.value = value; + entry.objectValue = value; } } @@ -350,14 +370,14 @@ public class CacheMemorySource extends public void appendListItem(K key, V value) { if (key == null) return; CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType()) { - Collection list = new ConcurrentLinkedQueue<>(); - entry = new CacheEntry(CacheEntryType.LIST, key, list); + if (entry == null || !entry.isListCacheType() || entry.listValue == null) { + ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); + entry = new CacheEntry(CacheEntryType.LIST, key, null, null, list); CacheEntry old = container.putIfAbsent(key, entry); - if (old != null) list = (Collection) old.value; - list.add(value); + if (old != null) list = old.listValue; + if (list != null) list.add(value); } else { - ((Collection) entry.getValue()).add(value); + entry.listValue.add(value); } } @@ -371,8 +391,8 @@ public class CacheMemorySource extends public void removeListItem(K key, V value) { if (key == null) return; CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType()) return; - ((Collection) entry.getValue()).remove(value); + if (entry == null || entry.listValue == null) return; + entry.listValue.remove(value); } @Override @@ -385,14 +405,14 @@ public class CacheMemorySource extends public void appendSetItem(K key, V value) { if (key == null) return; CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType()) { - Collection set = new CopyOnWriteArraySet(); - entry = new CacheEntry(CacheEntryType.SET, key, set); + if (entry == null || !entry.isSetCacheType() || entry.setValue == null) { + CopyOnWriteArraySet set = new CopyOnWriteArraySet(); + entry = new CacheEntry(CacheEntryType.SET, key, null, set, null); CacheEntry old = container.putIfAbsent(key, entry); - if (old != null) set = (Collection) old.value; - set.add(value); + if (old != null) set = old.setValue; + if (set != null) set.add(value); } else { - ((Collection) entry.getValue()).add(value); + entry.setValue.add(value); } } @@ -406,8 +426,8 @@ public class CacheMemorySource extends public void removeSetItem(K key, V value) { if (key == null) return; CacheEntry entry = container.get(key); - if (entry == null || !(entry.value instanceof Set)) return; - ((Set) entry.getValue()).remove(value); + if (entry == null || entry.setValue == null) return; + entry.setValue.remove(value); } @Override @@ -415,83 +435,14 @@ public class CacheMemorySource extends return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()); } - public static enum CacheEntryType { - OBJECT, SET, LIST; + @Override + public List> queryList() { + return new ArrayList<>(container.values()); } - public static final class CacheEntry { - - public static final String JSON_SET_KEY = "{\"cacheType\":\"" + CacheEntryType.SET + "\""; - - public static final String JSON_LIST_KEY = "{\"cacheType\":\"" + CacheEntryType.LIST + "\""; - - private final CacheEntryType cacheType; - - private final K key; - - //<=0表示永久保存 - private int expireSeconds; - - private volatile int lastAccessed; //最后刷新时间 - - private T value; - - public CacheEntry(CacheEntryType cacheType, K key, T value) { - this(cacheType, 0, key, value); - } - - public CacheEntry(CacheEntryType cacheType, int expireSeconds, K key, T value) { - this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, value); - } - - @ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "value"}) - public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, K key, T value) { - this.cacheType = cacheType; - this.expireSeconds = expireSeconds; - this.lastAccessed = lastAccessed; - this.key = key; - this.value = value; - } - - @Override - public String toString() { - return JsonFactory.root().getConvert().convertTo(this); - } - - @ConvertColumn(ignore = true) - public boolean isListCacheType() { - return cacheType == CacheEntryType.LIST; - } - - @ConvertColumn(ignore = true) - public boolean isSetCacheType() { - return cacheType == CacheEntryType.SET; - } - - @ConvertColumn(ignore = true) - public boolean isExpired() { - return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000)); - } - - public CacheEntryType getCacheType() { - return cacheType; - } - - public int getExpireSeconds() { - return expireSeconds; - } - - public int getLastAccessed() { - return lastAccessed; - } - - public T getValue() { - return value; - } - - public K getKey() { - return key; - } - + @Override + public CompletableFuture>> queryListAsync() { + return CompletableFuture.completedFuture(new ArrayList<>(container.values())); } + } diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index f455db6ca..3d38c8efd 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -5,9 +5,12 @@ */ package org.redkale.source; +import java.beans.ConstructorProperties; import java.io.*; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; +import org.redkale.convert.ConvertColumn; +import org.redkale.convert.json.JsonFactory; /** * @@ -54,6 +57,8 @@ public interface CacheSource { public void removeSetItem(final K key, final V value); + public List> queryList(); + //---------------------- CompletableFuture 异步版 --------------------------------- public CompletableFuture existsAsync(final K key); @@ -85,8 +90,103 @@ public interface CacheSource { public CompletableFuture removeSetItemAsync(final K key, final V value); + public CompletableFuture>> queryListAsync(); + default CompletableFuture isOpenAsync() { return CompletableFuture.completedFuture(true); } + public static enum CacheEntryType { + OBJECT, SET, LIST; + } + + public static final class CacheEntry { + + static final String JSON_SET_KEY = "{\"cacheType\":\"" + CacheEntryType.SET + "\""; + + static final String JSON_LIST_KEY = "{\"cacheType\":\"" + CacheEntryType.LIST + "\""; + + final CacheEntryType cacheType; + + final K key; + + //<=0表示永久保存 + int expireSeconds; + + volatile int lastAccessed; //最后刷新时间 + + T objectValue; + + CopyOnWriteArraySet setValue; + + ConcurrentLinkedQueue listValue; + + public CacheEntry(CacheEntryType cacheType, K key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { + this(cacheType, 0, key, objectValue, setValue, listValue); + } + + public CacheEntry(CacheEntryType cacheType, int expireSeconds, K key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { + this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, setValue, listValue); + } + + @ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "setValue", "listValue"}) + public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, K key, T objectValue, CopyOnWriteArraySet setValue, ConcurrentLinkedQueue listValue) { + this.cacheType = cacheType; + this.expireSeconds = expireSeconds; + this.lastAccessed = lastAccessed; + this.key = key; + this.objectValue = objectValue; + this.setValue = setValue; + this.listValue = listValue; + } + + @Override + public String toString() { + return JsonFactory.root().getConvert().convertTo(this); + } + + @ConvertColumn(ignore = true) + public boolean isListCacheType() { + return cacheType == CacheEntryType.LIST; + } + + @ConvertColumn(ignore = true) + public boolean isSetCacheType() { + return cacheType == CacheEntryType.SET; + } + + @ConvertColumn(ignore = true) + public boolean isExpired() { + return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000)); + } + + public CacheEntryType getCacheType() { + return cacheType; + } + + public int getExpireSeconds() { + return expireSeconds; + } + + public int getLastAccessed() { + return lastAccessed; + } + + public K getKey() { + return key; + } + + public T getObjectValue() { + return objectValue; + } + + public CopyOnWriteArraySet getSetValue() { + return setValue; + } + + public ConcurrentLinkedQueue getListValue() { + return listValue; + } + + } }