This commit is contained in:
Redkale
2017-05-28 13:30:50 +08:00
parent 6cd232efd2
commit ab90c80785
6 changed files with 209 additions and 144 deletions

View File

@@ -44,6 +44,11 @@ public final class CollectionDecoder<T> implements Decodeable<Reader, Collection
this.creator = factory.loadCreator((Class) pt.getRawType());
factory.register(type, this);
this.decoder = factory.loadDecoder(this.componentType);
} else if(factory.isReversible()){
this.componentType = Object.class;
this.creator = factory.loadCreator(Object.class);
factory.register(type, this);
this.decoder = factory.loadDecoder(this.componentType);
} else {
throw new ConvertException("collectiondecoder not support the type (" + type + ")");
}

View File

@@ -324,6 +324,8 @@ public final class SncpClient {
result.complete(rs);
}
} catch (Exception exp) {
result.completeExceptionally(exp);
} finally {
bsonConvert.offerBsonReader(reader);
}
@@ -380,16 +382,23 @@ public final class SncpClient {
final long seqid = System.nanoTime();
final DLong actionid = action.actionid;
final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0;
final AsyncConnection conn = transport.pollConnection(addr);
if (conn == null || !conn.isOpen()) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") cannot connect " + (conn == null ? addr : conn.getRemoteAddress()));
throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect");
final CompletableFuture<byte[]> future = new CompletableFuture();
AsyncConnection conn0;
try {
conn0 = transport.pollConnection(addr);
} catch (Exception e) {
future.completeExceptionally(e);
return future;
}
if (conn0 == null || !conn0.isOpen()) {
future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect"));
return future;
}
final AsyncConnection conn = conn0;
final ByteBuffer[] sendBuffers = writer.toBuffers();
fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength);
final ByteBuffer buffer = transport.pollBuffer();
final CompletableFuture<byte[]> future = new CompletableFuture();
conn.write(sendBuffers, sendBuffers, new CompletionHandler<Integer, ByteBuffer[]>() {
@Override

View File

@@ -48,8 +48,8 @@ public final class SncpDynServlet extends SncpServlet {
private Supplier<ByteBuffer> bufferSupplier;
public SncpDynServlet(final BsonConvert convert, final String serviceName, final Class<? extends Service> type, final Service service) {
super(serviceName, type, service);
public SncpDynServlet(final BsonConvert convert, final String serviceName, final Class serviceOrSourceType, final Service service) {
super(serviceName, serviceOrSourceType, service);
this.serviceid = Sncp.hash(type.getName() + ':' + serviceName);
Set<DLong> actionids = new HashSet<>();
for (java.lang.reflect.Method method : service.getClass().getMethods()) {

View File

@@ -20,14 +20,14 @@ import org.redkale.util.*;
*/
public abstract class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse> implements Comparable<SncpServlet> {
protected final Class<? extends Service> type;
protected final Class type;
protected final String serviceName;
protected final Service service;
protected SncpServlet(String serviceName, Class<? extends Service> type, Service service) {
this.type = type;
protected SncpServlet(String serviceName, Class serviceOrSourceType, Service service) {
this.type = serviceOrSourceType;
this.service = service;
this.serviceName = serviceName;
}

View File

@@ -5,7 +5,6 @@
*/
package org.redkale.source;
import java.beans.ConstructorProperties;
import java.io.*;
import java.lang.reflect.Type;
import java.util.*;
@@ -13,7 +12,6 @@ import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.logging.*;
import javax.annotation.Resource;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.*;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.*;
@@ -56,7 +54,10 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final ConcurrentHashMap<K, CacheEntry<K, ?>> container = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<K, CacheEntry<K, Object>> container = new ConcurrentHashMap<>();
@RpcRemote
protected CacheSource<K, V> remoteSource;
public CacheMemorySource() {
}
@@ -125,32 +126,51 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
boolean datasync = false; //是否从远程同步过数据
//----------同步数据……-----------
// TODO
if (!this.needStore) return;
try {
File store = new File(home, "cache/" + resourceName());
if (!store.isFile() || !store.canRead()) return;
LineNumberReader reader = new LineNumberReader(new FileReader(store));
if (this.keyType == null) this.keyType = Serializable.class;
if (this.objValueType == null) {
this.objValueType = Object.class;
this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, this.objValueType);
this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, this.objValueType);
if (this.needStore) {
try {
File store = new File(home, "cache/" + resourceName());
if (!store.isFile() || !store.canRead()) return;
LineNumberReader reader = new LineNumberReader(new FileReader(store));
if (this.keyType == null) this.keyType = Serializable.class;
if (this.objValueType == null) {
this.objValueType = Object.class;
this.setValueType = TypeToken.createParameterizedType(null, CopyOnWriteArraySet.class, this.objValueType);
this.listValueType = TypeToken.createParameterizedType(null, ConcurrentLinkedQueue.class, this.objValueType);
}
final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType);
final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType);
final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType);
String line;
while ((line = reader.readLine()) != null) {
if (line.isEmpty()) continue;
CacheEntry<K, Object> entry = convert.convertFrom(line.startsWith(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.startsWith(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line);
if (entry.isExpired()) continue;
if (datasync && container.containsKey(entry.key)) continue; //已经同步了
container.put(entry.key, entry);
}
reader.close();
store.delete();
} catch (Exception e) {
logger.log(Level.SEVERE, CacheSource.class.getSimpleName() + "(" + resourceName() + ") load store file error ", e);
}
final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType);
final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType);
final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType);
String line;
while ((line = reader.readLine()) != null) {
if (line.isEmpty()) continue;
CacheEntry<K, ?> entry = convert.convertFrom(line.startsWith(CacheEntry.JSON_SET_KEY) ? storeSetType : (line.startsWith(CacheEntry.JSON_LIST_KEY) ? storeListType : storeObjType), line);
if (entry.isExpired()) continue;
if (datasync && container.containsKey(entry.key)) continue; //已经同步了
container.put(entry.key, entry);
}
reader.close();
store.delete();
} catch (Exception e) {
logger.log(Level.SEVERE, CacheSource.class.getSimpleName() + "(" + resourceName() + ") load store file error ", e);
}
if (remoteSource != null && !Sncp.isRemote(this)) {
super.runAsync(() -> {
try {
CompletableFuture<List<CacheEntry<K, Object>>> listFuture = remoteSource.queryListAsync();
listFuture.whenComplete((list, exp) -> {
if (exp != null) {
logger.log(Level.FINEST, CacheSource.class.getSimpleName() + "(" + resourceName() + ") queryListAsync error", exp);
} else {
for (CacheEntry<K, Object> entry : list) {
container.put(entry.key, entry);
}
}
});
} catch (Exception e) {
logger.log(Level.FINEST, CacheSource.class.getSimpleName() + "(" + resourceName() + ") queryListAsync error, maybe remote node connot connect ", e);
}
});
}
}
@@ -176,7 +196,7 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
final Type storeObjType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, objValueType);
final Type storeSetType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, setValueType);
final Type storeListType = TypeToken.createParameterizedType(null, CacheEntry.class, keyType, listValueType);
Collection<CacheEntry<K, ?>> entrys = (Collection<CacheEntry<K, ?>>) container.values();
Collection<CacheEntry<K, Object>> entrys = container.values();
for (CacheEntry entry : entrys) {
stream.println(convert.convertTo(entry.isSetCacheType() ? storeSetType : (entry.isListCacheType() ? storeListType : storeObjType), entry));
}
@@ -204,10 +224,10 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
public V get(K key) {
if (key == null) return null;
CacheEntry entry = container.get(key);
if (entry == null || entry.isExpired() || entry.value == null) return null;
if (entry.isListCacheType()) return (V) new ArrayList((Collection) entry.value);
if (entry.isSetCacheType()) return (V) new HashSet((Collection) entry.value);
return (V) entry.getValue();
if (entry == null || entry.isExpired()) return null;
if (entry.isListCacheType()) return (V) (entry.listValue == null ? null : new ArrayList(entry.listValue));
if (entry.isSetCacheType()) return (V) (entry.setValue == null ? null : new HashSet(entry.setValue));
return (V) entry.objectValue;
}
@Override
@@ -220,12 +240,12 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
public V getAndRefresh(K key, final int expireSeconds) {
if (key == null) return null;
CacheEntry entry = container.get(key);
if (entry == null || entry.isExpired() || entry.value == null) return null;
if (entry == null || entry.isExpired()) return null;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.expireSeconds = expireSeconds;
if (entry.isListCacheType()) return (V) new ArrayList((Collection) entry.value);
if (entry.isSetCacheType()) return (V) new HashSet((Collection) entry.value);
return (V) entry.getValue();
if (entry.isListCacheType()) return (V) (entry.listValue == null ? null : new ArrayList(entry.listValue));
if (entry.isSetCacheType()) return (V) (entry.setValue == null ? null : new HashSet(entry.setValue));
return (V) entry.objectValue;
}
@Override
@@ -254,11 +274,11 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(CacheEntryType.OBJECT, key, value);
entry = new CacheEntry(CacheEntryType.OBJECT, key, value, null, null);
container.putIfAbsent(key, entry);
} else {
entry.expireSeconds = 0;
entry.value = value;
entry.objectValue = value;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
}
}
@@ -274,12 +294,12 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) {
entry = new CacheEntry(CacheEntryType.OBJECT, expireSeconds, key, value);
entry = new CacheEntry(CacheEntryType.OBJECT, expireSeconds, key, value, null, null);
container.putIfAbsent(key, entry);
} else {
if (expireSeconds > 0) entry.expireSeconds = expireSeconds;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.value = value;
entry.objectValue = value;
}
}
@@ -350,14 +370,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
public void appendListItem(K key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || !entry.isListCacheType()) {
Collection<V> list = new ConcurrentLinkedQueue<>();
entry = new CacheEntry(CacheEntryType.LIST, key, list);
if (entry == null || !entry.isListCacheType() || entry.listValue == null) {
ConcurrentLinkedQueue<V> list = new ConcurrentLinkedQueue();
entry = new CacheEntry(CacheEntryType.LIST, key, null, null, list);
CacheEntry old = container.putIfAbsent(key, entry);
if (old != null) list = (Collection) old.value;
list.add(value);
if (old != null) list = old.listValue;
if (list != null) list.add(value);
} else {
((Collection) entry.getValue()).add(value);
entry.listValue.add(value);
}
}
@@ -371,8 +391,8 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
public void removeListItem(K key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || !entry.isListCacheType()) return;
((Collection) entry.getValue()).remove(value);
if (entry == null || entry.listValue == null) return;
entry.listValue.remove(value);
}
@Override
@@ -385,14 +405,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
public void appendSetItem(K key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || !entry.isSetCacheType()) {
Collection<V> set = new CopyOnWriteArraySet();
entry = new CacheEntry(CacheEntryType.SET, key, set);
if (entry == null || !entry.isSetCacheType() || entry.setValue == null) {
CopyOnWriteArraySet<V> set = new CopyOnWriteArraySet();
entry = new CacheEntry(CacheEntryType.SET, key, null, set, null);
CacheEntry old = container.putIfAbsent(key, entry);
if (old != null) set = (Collection) old.value;
set.add(value);
if (old != null) set = old.setValue;
if (set != null) set.add(value);
} else {
((Collection) entry.getValue()).add(value);
entry.setValue.add(value);
}
}
@@ -406,8 +426,8 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
public void removeSetItem(K key, V value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || !(entry.value instanceof Set)) return;
((Set) entry.getValue()).remove(value);
if (entry == null || entry.setValue == null) return;
entry.setValue.remove(value);
}
@Override
@@ -415,83 +435,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor());
}
public static enum CacheEntryType {
OBJECT, SET, LIST;
@Override
public List<CacheEntry<K, Object>> queryList() {
return new ArrayList<>(container.values());
}
public static final class CacheEntry<K extends Serializable, T> {
public static final String JSON_SET_KEY = "{\"cacheType\":\"" + CacheEntryType.SET + "\"";
public static final String JSON_LIST_KEY = "{\"cacheType\":\"" + CacheEntryType.LIST + "\"";
private final CacheEntryType cacheType;
private final K key;
//<=0表示永久保存
private int expireSeconds;
private volatile int lastAccessed; //最后刷新时间
private T value;
public CacheEntry(CacheEntryType cacheType, K key, T value) {
this(cacheType, 0, key, value);
}
public CacheEntry(CacheEntryType cacheType, int expireSeconds, K key, T value) {
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, value);
}
@ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "value"})
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, K key, T value) {
this.cacheType = cacheType;
this.expireSeconds = expireSeconds;
this.lastAccessed = lastAccessed;
this.key = key;
this.value = value;
}
@Override
public String toString() {
return JsonFactory.root().getConvert().convertTo(this);
}
@ConvertColumn(ignore = true)
public boolean isListCacheType() {
return cacheType == CacheEntryType.LIST;
}
@ConvertColumn(ignore = true)
public boolean isSetCacheType() {
return cacheType == CacheEntryType.SET;
}
@ConvertColumn(ignore = true)
public boolean isExpired() {
return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000));
}
public CacheEntryType getCacheType() {
return cacheType;
}
public int getExpireSeconds() {
return expireSeconds;
}
public int getLastAccessed() {
return lastAccessed;
}
public T getValue() {
return value;
}
public K getKey() {
return key;
}
@Override
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync() {
return CompletableFuture.completedFuture(new ArrayList<>(container.values()));
}
}

View File

@@ -5,9 +5,12 @@
*/
package org.redkale.source;
import java.beans.ConstructorProperties;
import java.io.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonFactory;
/**
*
@@ -54,6 +57,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
public void removeSetItem(final K key, final V value);
public List<CacheEntry<K, Object>> queryList();
//---------------------- CompletableFuture 异步版 ---------------------------------
public CompletableFuture<Boolean> existsAsync(final K key);
@@ -85,8 +90,103 @@ public interface CacheSource<K extends Serializable, V extends Object> {
public CompletableFuture<Void> removeSetItemAsync(final K key, final V value);
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync();
default CompletableFuture<Boolean> isOpenAsync() {
return CompletableFuture.completedFuture(true);
}
public static enum CacheEntryType {
OBJECT, SET, LIST;
}
public static final class CacheEntry<K extends Serializable, T> {
static final String JSON_SET_KEY = "{\"cacheType\":\"" + CacheEntryType.SET + "\"";
static final String JSON_LIST_KEY = "{\"cacheType\":\"" + CacheEntryType.LIST + "\"";
final CacheEntryType cacheType;
final K key;
//<=0表示永久保存
int expireSeconds;
volatile int lastAccessed; //最后刷新时间
T objectValue;
CopyOnWriteArraySet<T> setValue;
ConcurrentLinkedQueue<T> listValue;
public CacheEntry(CacheEntryType cacheType, K key, T objectValue, CopyOnWriteArraySet<T> setValue, ConcurrentLinkedQueue<T> listValue) {
this(cacheType, 0, key, objectValue, setValue, listValue);
}
public CacheEntry(CacheEntryType cacheType, int expireSeconds, K key, T objectValue, CopyOnWriteArraySet<T> setValue, ConcurrentLinkedQueue<T> listValue) {
this(cacheType, expireSeconds, (int) (System.currentTimeMillis() / 1000), key, objectValue, setValue, listValue);
}
@ConstructorProperties({"cacheType", "expireSeconds", "lastAccessed", "key", "objectValue", "setValue", "listValue"})
public CacheEntry(CacheEntryType cacheType, int expireSeconds, int lastAccessed, K key, T objectValue, CopyOnWriteArraySet<T> setValue, ConcurrentLinkedQueue<T> listValue) {
this.cacheType = cacheType;
this.expireSeconds = expireSeconds;
this.lastAccessed = lastAccessed;
this.key = key;
this.objectValue = objectValue;
this.setValue = setValue;
this.listValue = listValue;
}
@Override
public String toString() {
return JsonFactory.root().getConvert().convertTo(this);
}
@ConvertColumn(ignore = true)
public boolean isListCacheType() {
return cacheType == CacheEntryType.LIST;
}
@ConvertColumn(ignore = true)
public boolean isSetCacheType() {
return cacheType == CacheEntryType.SET;
}
@ConvertColumn(ignore = true)
public boolean isExpired() {
return (expireSeconds > 0 && lastAccessed + expireSeconds < (System.currentTimeMillis() / 1000));
}
public CacheEntryType getCacheType() {
return cacheType;
}
public int getExpireSeconds() {
return expireSeconds;
}
public int getLastAccessed() {
return lastAccessed;
}
public K getKey() {
return key;
}
public T getObjectValue() {
return objectValue;
}
public CopyOnWriteArraySet<T> getSetValue() {
return setValue;
}
public ConcurrentLinkedQueue<T> getListValue() {
return listValue;
}
}
}