This commit is contained in:
wentch
2015-12-17 17:26:45 +08:00
parent 8ab0b04067
commit 251ef184c1
6 changed files with 123 additions and 16 deletions

View File

@@ -205,7 +205,8 @@ public abstract class NodeServer {
} catch (Exception e) { } catch (Exception e) {
//src 不含 MultiRun 方法 //src 不含 MultiRun 方法
} }
CacheSource source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports); CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
source.setNeedStore(field.getAnnotation(CacheStore.class) != null);
application.cacheSources.add(source); application.cacheSources.add(source);
regFactory.register(resourceName, CacheSource.class, source); regFactory.register(resourceName, CacheSource.class, source);
field.set(src, source); field.set(src, source);

View File

@@ -9,6 +9,7 @@ import org.redkale.convert.*;
/** /**
* *
* @see http://www.redkale.org
* @author zhangjx * @author zhangjx
* @param <R> * @param <R>
* @param <W> * @param <W>

View File

@@ -7,6 +7,7 @@ package org.redkale.net.sncp;
/** /**
* *
* @see http://www.redkale.org
* @author zhangjx * @author zhangjx
*/ */
public enum SncpParamType { public enum SncpParamType {

View File

@@ -10,17 +10,28 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.*; import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.*;
import org.redkale.convert.json.*; import org.redkale.convert.json.*;
import org.redkale.net.sncp.*;
import org.redkale.source.*; import org.redkale.source.*;
import org.redkale.util.*; import org.redkale.util.*;
/** /**
* *
* @see http://www.redkale.org
* @author zhangjx * @author zhangjx
*/ */
@AutoLoad(false) @AutoLoad(false)
public class CacheSourceService implements CacheSource, Service { public class CacheSourceService implements CacheSource, Service {
@Resource(name = "APP_HOME")
private File home;
@Resource
private JsonConvert convert;
private boolean needStore;
private ScheduledThreadPoolExecutor scheduler; private ScheduledThreadPoolExecutor scheduler;
private Consumer<CacheEntry> expireHandler; private Consumer<CacheEntry> expireHandler;
@@ -29,10 +40,19 @@ public class CacheSourceService implements CacheSource, Service {
protected final ConcurrentHashMap<Serializable, CacheEntry> container = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<Serializable, CacheEntry> container = new ConcurrentHashMap<>();
public CacheSourceService() {
}
public CacheSourceService setNeedStore(boolean needstore) {
this.needStore = needstore;
return this;
}
@Override @Override
public void init(AnyValue conf) { public void init(AnyValue conf) {
final CacheSourceService self = this; final CacheSourceService self = this;
AnyValue prop = conf == null ? null : conf.getAnyValue("property"); AnyValue prop = conf == null ? null : conf.getAnyValue("property");
if (!needStore && prop != null) this.needStore = prop.getBoolValue("cachestore", false);
String expireHandlerClass = prop == null ? null : prop.getValue("expirehandler"); String expireHandlerClass = prop == null ? null : prop.getValue("expirehandler");
if (expireHandlerClass != null) { if (expireHandlerClass != null) {
try { try {
@@ -63,21 +83,55 @@ public class CacheSourceService implements CacheSource, Service {
}, 10, 10, TimeUnit.SECONDS); }, 10, 10, TimeUnit.SECONDS);
logger.finest(self.getClass().getSimpleName() + ":" + self.name() + " start schedule expire executor"); logger.finest(self.getClass().getSimpleName() + ":" + self.name() + " start schedule expire executor");
} }
if (!needStore || Sncp.isRemote(self)) return;
try {
CacheEntry.initCreator();
File store = new File(home, "cache/" + name());
if (!store.isFile() || !store.canRead()) return;
LineNumberReader reader = new LineNumberReader(new FileReader(store));
String line;
while ((line = reader.readLine()) != null) {
if (line.isEmpty()) continue;
CacheEntry entry = convert.convertFrom(CacheEntry.class, line);
container.put(entry.key, entry);
}
reader.close();
store.delete();
} catch (Exception e) {
logger.log(Level.SEVERE, CacheSource.class.getSimpleName() + "(" + name() + ") load store file error ", e);
}
} }
public void close() { //给Application 关闭时调用 public void close() { //给Application 关闭时调用
if (scheduler != null) scheduler.shutdownNow(); destroy(null);
} }
@Override @Override
public void destroy(AnyValue conf) { public void destroy(AnyValue conf) {
if (scheduler != null) scheduler.shutdownNow(); if (scheduler != null) scheduler.shutdownNow();
if (!needStore || Sncp.isRemote(this) || container.isEmpty()) return;
try {
CacheEntry.initCreator();
File store = new File(home, "cache/" + name());
store.getParentFile().mkdirs();
PrintStream stream = new PrintStream(store, "UTF-8");
Collection<CacheEntry> values = container.values();
for (CacheEntry entry : values) {
stream.println(convert.convertTo(entry));
}
container.clear();
stream.close();
} catch (Exception e) {
logger.log(Level.SEVERE, CacheSource.class.getSimpleName() + "(" + name() + ") store to file error ", e);
}
} }
@Override @Override
public boolean exists(Serializable key) { public boolean exists(Serializable key) {
if (key == null) return false; if (key == null) return false;
return container.containsKey(key); CacheEntry entry = container.get(key);
if (entry == null) return false;
return !(entry.expireSeconds > 0 && entry.lastAccessed + entry.expireSeconds < (System.currentTimeMillis() / 1000));
} }
@Override @Override
@@ -85,12 +139,13 @@ public class CacheSourceService implements CacheSource, Service {
if (key == null) return null; if (key == null) return null;
CacheEntry entry = container.get(key); CacheEntry entry = container.get(key);
if (entry == null) return null; if (entry == null) return null;
if (entry.expireSeconds > 0 && entry.lastAccessed + entry.expireSeconds < (System.currentTimeMillis() / 1000)) return null;
return (T) entry.getValue(); return (T) entry.getValue();
} }
@Override @Override
@MultiRun @MultiRun
public <T> T refreshAndGet(Serializable key) { public <T> T getAndRefresh(Serializable key) {
if (key == null) return null; if (key == null) return null;
CacheEntry entry = container.get(key); CacheEntry entry = container.get(key);
if (entry == null) return null; if (entry == null) return null;
@@ -116,7 +171,9 @@ public class CacheSourceService implements CacheSource, Service {
entry = new CacheEntry(key, value); entry = new CacheEntry(key, value);
container.putIfAbsent(key, entry); container.putIfAbsent(key, entry);
} else { } else {
entry.expireSeconds = 0;
entry.value = value; entry.value = value;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
} }
} }
@@ -139,6 +196,7 @@ public class CacheSourceService implements CacheSource, Service {
container.putIfAbsent(key, entry); container.putIfAbsent(key, entry);
} else { } else {
if (expireSeconds > 0) entry.expireSeconds = expireSeconds; if (expireSeconds > 0) entry.expireSeconds = expireSeconds;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.value = value; entry.value = value;
} }
} }
@@ -202,25 +260,48 @@ public class CacheSourceService implements CacheSource, Service {
public static final class CacheEntry<T> { public static final class CacheEntry<T> {
private final int createTime; //创建时间 public static class CacheEntryCreator implements Creator<CacheEntry> {
private volatile int lastAccessed; //最后刷新时间 public static final CacheEntryCreator CREATOR = new CacheEntryCreator();
@java.beans.ConstructorProperties({"expireSeconds", "lastAccessed", "key", "value"})
public CacheEntryCreator() {
}
@Override
public CacheEntry create(Object... params) {
return new CacheEntry((Integer) params[0], (Integer) params[1], (Serializable) params[2], params[3]);
}
}
static void initCreator() {
if (JsonFactory.root().findCreator(CacheEntry.class) == null) {
JsonFactory.root().register(CacheEntry.class, CacheEntryCreator.CREATOR);
}
}
private final Serializable key;
//<=0表示永久保存 //<=0表示永久保存
private int expireSeconds; private int expireSeconds;
private T value; private volatile int lastAccessed; //最后刷新时间
private final Serializable key; private T value;
public CacheEntry(Serializable key, T value) { public CacheEntry(Serializable key, T value) {
this(0, key, value); this(0, key, value);
} }
public CacheEntry(int expireSecond, Serializable key, T value) { public CacheEntry(int expireSeconds, Serializable key, T value) {
this.expireSeconds = expireSecond; this(expireSeconds, (int) (System.currentTimeMillis() / 1000), key, value);
this.createTime = (int) (System.currentTimeMillis() / 1000); }
this.lastAccessed = this.createTime;
@java.beans.ConstructorProperties({"expireSeconds", "lastAccessed", "key", "value"})
private CacheEntry(int expireSeconds, int lastAccessed, Serializable key, T value) {
this.expireSeconds = expireSeconds;
this.lastAccessed = lastAccessed;
this.key = key; this.key = key;
this.value = value; this.value = value;
} }
@@ -230,11 +311,11 @@ public class CacheSourceService implements CacheSource, Service {
return JsonFactory.root().getConvert().convertTo(this); return JsonFactory.root().getConvert().convertTo(this);
} }
public long getCreateTime() { public int getExpireSeconds() {
return createTime; return expireSeconds;
} }
public long getLastAccessed() { public int getLastAccessed() {
return lastAccessed; return lastAccessed;
} }

View File

@@ -22,7 +22,7 @@ public interface CacheSource {
public <T> T get(final Serializable key); public <T> T get(final Serializable key);
public <T> T refreshAndGet(final Serializable key); public <T> T getAndRefresh(final Serializable key);
public void refresh(final Serializable key); public void refresh(final Serializable key);

View File

@@ -0,0 +1,23 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.source;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* 用于标记CacheSource 是否需要持久化到文件中
* 注意: 标记为@CacheStore 的 CacheSource对的name()不能包含特殊字符, 否则无法创建存储文件。
*
* @see http://www.redkale.org
* @author zhangjx
*/
@Target({FIELD})
@Retention(RUNTIME)
public @interface CacheStore {
}