This commit is contained in:
wentch
2015-12-17 17:47:21 +08:00
parent 251ef184c1
commit c2318297fa
3 changed files with 68 additions and 15 deletions

View File

@@ -206,7 +206,8 @@ public abstract class NodeServer {
//src 不含 MultiRun 方法 //src 不含 MultiRun 方法
} }
CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports); CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
source.setNeedStore(field.getAnnotation(CacheStore.class) != null); CacheStore store = field.getAnnotation(CacheStore.class);
if (store != null) source.setStoreType(store.keyType(), store.valueType());
application.cacheSources.add(source); application.cacheSources.add(source);
regFactory.register(resourceName, CacheSource.class, source); regFactory.register(resourceName, CacheSource.class, source);
field.set(src, source); field.set(src, source);

View File

@@ -6,6 +6,7 @@
package org.redkale.service; package org.redkale.service;
import java.io.*; import java.io.*;
import java.lang.reflect.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.*; import java.util.function.*;
@@ -30,7 +31,9 @@ public class CacheSourceService implements CacheSource, Service {
@Resource @Resource
private JsonConvert convert; private JsonConvert convert;
private boolean needStore; private Class storeKeyType;
private Class storeValueType;
private ScheduledThreadPoolExecutor scheduler; private ScheduledThreadPoolExecutor scheduler;
@@ -43,8 +46,9 @@ public class CacheSourceService implements CacheSource, Service {
public CacheSourceService() { public CacheSourceService() {
} }
public CacheSourceService setNeedStore(boolean needstore) { public CacheSourceService setStoreType(Class storeKeyType, Class storeValueType) {
this.needStore = needstore; this.storeKeyType = storeKeyType;
this.storeValueType = storeValueType;
return this; return this;
} }
@@ -52,7 +56,18 @@ public class CacheSourceService implements CacheSource, Service {
public void init(AnyValue conf) { public void init(AnyValue conf) {
final CacheSourceService self = this; final CacheSourceService self = this;
AnyValue prop = conf == null ? null : conf.getAnyValue("property"); AnyValue prop = conf == null ? null : conf.getAnyValue("property");
if (!needStore && prop != null) this.needStore = prop.getBoolValue("cachestore", false); if (storeKeyType == null && prop != null) {
String storeKeyStr = prop.getValue("store-key-type");
String storeValueStr = prop.getValue("store-value-type");
if (storeKeyStr != null && storeValueStr != null) {
try {
this.storeKeyType = Class.forName(storeKeyStr);
this.storeValueType = Class.forName(storeValueStr);
} catch (Exception e) {
logger.log(Level.SEVERE, self.getClass().getSimpleName() + " load key & value store class (" + storeKeyStr + ", " + storeValueStr + ") error", e);
}
}
}
String expireHandlerClass = prop == null ? null : prop.getValue("expirehandler"); String expireHandlerClass = prop == null ? null : prop.getValue("expirehandler");
if (expireHandlerClass != null) { if (expireHandlerClass != null) {
try { try {
@@ -83,16 +98,33 @@ public class CacheSourceService implements CacheSource, Service {
}, 10, 10, TimeUnit.SECONDS); }, 10, 10, TimeUnit.SECONDS);
logger.finest(self.getClass().getSimpleName() + ":" + self.name() + " start schedule expire executor"); logger.finest(self.getClass().getSimpleName() + ":" + self.name() + " start schedule expire executor");
} }
if (!needStore || Sncp.isRemote(self)) return; if (this.storeKeyType == null || Sncp.isRemote(self)) return;
try { try {
CacheEntry.initCreator(); CacheEntry.initCreator();
File store = new File(home, "cache/" + name()); File store = new File(home, "cache/" + name());
if (!store.isFile() || !store.canRead()) return; if (!store.isFile() || !store.canRead()) return;
LineNumberReader reader = new LineNumberReader(new FileReader(store)); LineNumberReader reader = new LineNumberReader(new FileReader(store));
final ParameterizedType storeType = new ParameterizedType() {
@Override
public Type[] getActualTypeArguments() {
return new Type[]{storeKeyType, storeValueType};
}
@Override
public Type getRawType() {
return CacheEntry.class;
}
@Override
public Type getOwnerType() {
return null;
}
};
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
if (line.isEmpty()) continue; if (line.isEmpty()) continue;
CacheEntry entry = convert.convertFrom(CacheEntry.class, line); CacheEntry entry = convert.convertFrom(storeType, line);
container.put(entry.key, entry); container.put(entry.key, entry);
} }
reader.close(); reader.close();
@@ -109,15 +141,32 @@ public class CacheSourceService implements CacheSource, Service {
@Override @Override
public void destroy(AnyValue conf) { public void destroy(AnyValue conf) {
if (scheduler != null) scheduler.shutdownNow(); if (scheduler != null) scheduler.shutdownNow();
if (!needStore || Sncp.isRemote(this) || container.isEmpty()) return; if (this.storeKeyType == null || Sncp.isRemote(this) || container.isEmpty()) return;
try { try {
CacheEntry.initCreator(); CacheEntry.initCreator();
File store = new File(home, "cache/" + name()); File store = new File(home, "cache/" + name());
store.getParentFile().mkdirs(); store.getParentFile().mkdirs();
PrintStream stream = new PrintStream(store, "UTF-8"); PrintStream stream = new PrintStream(store, "UTF-8");
Collection<CacheEntry> values = container.values(); Collection<CacheEntry> values = container.values();
final ParameterizedType storeType = new ParameterizedType() {
@Override
public Type[] getActualTypeArguments() {
return new Type[]{storeKeyType, storeValueType};
}
@Override
public Type getRawType() {
return CacheEntry.class;
}
@Override
public Type getOwnerType() {
return null;
}
};
for (CacheEntry entry : values) { for (CacheEntry entry : values) {
stream.println(convert.convertTo(entry)); stream.println(convert.convertTo(storeType, entry));
} }
container.clear(); container.clear();
stream.close(); stream.close();
@@ -258,7 +307,7 @@ public class CacheSourceService implements CacheSource, Service {
((Set) entry.getValue()).remove(value); ((Set) entry.getValue()).remove(value);
} }
public static final class CacheEntry<T> { public static final class CacheEntry<K extends Serializable, T> {
public static class CacheEntryCreator implements Creator<CacheEntry> { public static class CacheEntryCreator implements Creator<CacheEntry> {
@@ -281,7 +330,7 @@ public class CacheSourceService implements CacheSource, Service {
} }
} }
private final Serializable key; private final K key;
//<=0表示永久保存 //<=0表示永久保存
private int expireSeconds; private int expireSeconds;
@@ -290,16 +339,16 @@ public class CacheSourceService implements CacheSource, Service {
private T value; private T value;
public CacheEntry(Serializable key, T value) { public CacheEntry(K key, T value) {
this(0, key, value); this(0, key, value);
} }
public CacheEntry(int expireSeconds, Serializable key, T value) { public CacheEntry(int expireSeconds, K key, T value) {
this(expireSeconds, (int) (System.currentTimeMillis() / 1000), key, value); this(expireSeconds, (int) (System.currentTimeMillis() / 1000), key, value);
} }
@java.beans.ConstructorProperties({"expireSeconds", "lastAccessed", "key", "value"}) @java.beans.ConstructorProperties({"expireSeconds", "lastAccessed", "key", "value"})
private CacheEntry(int expireSeconds, int lastAccessed, Serializable key, T value) { private CacheEntry(int expireSeconds, int lastAccessed, K key, T value) {
this.expireSeconds = expireSeconds; this.expireSeconds = expireSeconds;
this.lastAccessed = lastAccessed; this.lastAccessed = lastAccessed;
this.key = key; this.key = key;
@@ -323,7 +372,7 @@ public class CacheSourceService implements CacheSource, Service {
return value; return value;
} }
public Serializable getKey() { public K getKey() {
return key; return key;
} }

View File

@@ -20,4 +20,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Retention(RUNTIME) @Retention(RUNTIME)
public @interface CacheStore { public @interface CacheStore {
Class keyType(); //key对应的class
Class valueType(); //value 对应的class
} }