This commit is contained in:
Redkale
2017-03-20 13:37:40 +08:00
parent 95b7e819cd
commit 7a195ecf23
7 changed files with 45 additions and 36 deletions

View File

@@ -7,9 +7,9 @@
<shared-cache-mode>NONE</shared-cache-mode>
<properties>
<!--
DataSource的实现类没有设置默认为org.redkale.source.DataDefaultSource的实现使用常规基于JDBC的数据库驱动一般无需设置
DataSource的实现类没有设置默认为org.redkale.source.DataJdbcSource的实现使用常规基于JDBC的数据库驱动一般无需设置
-->
<property name="javax.persistence.datasource" value="org.redkale.source.DataDefaultSource"/>
<property name="javax.persistence.datasource" value="org.redkale.source.DataJdbcSource"/>
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbuser?characterEncoding=utf8"/>
<!--

View File

@@ -26,20 +26,20 @@ public class DataCacheListenerService implements DataCacheListener, Service {
@Override
@RpcMultiRun(selfrun = false, async = true)
public <T> void insertCache(Class<T> clazz, T... entitys) {
((DataDefaultSource) source).insertCache(clazz, entitys);
public <T> int insertCache(Class<T> clazz, T... entitys) {
return ((DataCacheListener) source).insertCache(clazz, entitys);
}
@Override
@RpcMultiRun(selfrun = false, async = true)
public <T> void updateCache(Class<T> clazz, T... entitys) {
((DataDefaultSource) source).updateCache(clazz, entitys);
public <T> int updateCache(Class<T> clazz, T... entitys) {
return ((DataCacheListener) source).updateCache(clazz, entitys);
}
@Override
@RpcMultiRun(selfrun = false, async = true)
public <T> void deleteCache(Class<T> clazz, Serializable... ids) {
((DataDefaultSource) source).deleteCache(clazz, ids);
public <T> int deleteCache(Class<T> clazz, Serializable... ids) {
return ((DataCacheListener) source).deleteCache(clazz, ids);
}
}

View File

@@ -9,14 +9,16 @@ import java.io.Serializable;
/**
*
* <p> 详情见: https://redkale.org
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public interface DataCacheListener {
public <T> void insertCache(Class<T> clazz, T... entitys);
public <T> int insertCache(Class<T> clazz, T... entitys);
public <T> void updateCache(Class<T> clazz, T... entitys);
public <T> int updateCache(Class<T> clazz, T... entitys);
public <T> void deleteCache(Class<T> clazz, Serializable... ids);
public <T> int deleteCache(Class<T> clazz, Serializable... ids);
}

View File

@@ -24,11 +24,11 @@ import org.redkale.util.*;
* @author zhangjx
*/
@SuppressWarnings("unchecked")
public final class DataDefaultSource implements DataSource, Function<Class, EntityInfo>, AutoCloseable {
public final class DataJdbcSource implements DataSource, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable {
private static final Flipper FLIPPER_ONE = new Flipper(1);
final Logger logger = Logger.getLogger(DataDefaultSource.class.getSimpleName());
final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
@@ -38,20 +38,20 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final boolean cacheForbidden;
private final JDBCPoolSource readPool;
private final PoolJdbcSource readPool;
private final JDBCPoolSource writePool;
private final PoolJdbcSource writePool;
@Resource(name = "$")
private DataCacheListener cacheListener;
private final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
public DataDefaultSource(String unitName, Properties readprop, Properties writeprop) {
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
this.name = unitName;
this.conf = null;
this.readPool = new JDBCPoolSource(this, "read", readprop);
this.writePool = new JDBCPoolSource(this, "write", writeprop);
this.readPool = new PoolJdbcSource(this, "read", readprop);
this.writePool = new PoolJdbcSource(this, "write", writeprop);
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty("shared-cache-mode"));
}
@@ -273,14 +273,17 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
return prestmt;
}
public <T> void insertCache(Class<T> clazz, T... values) {
if (values.length == 0) return;
@Override
public <T> int insertCache(Class<T> clazz, T... values) {
if (values.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
if (cache == null) return -1;
int c = 0;
for (T value : values) {
cache.insert(value);
c += cache.insert(value);
}
return c;
}
//-------------------------deleteCache--------------------------
@@ -471,6 +474,7 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
}
}
@Override
public <T> int deleteCache(Class<T> clazz, Serializable... ids) {
if (ids.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo(clazz);
@@ -1160,6 +1164,7 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
}
}
@Override
public <T> int updateCache(Class<T> clazz, T... values) {
if (values.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo(clazz);

View File

@@ -425,14 +425,16 @@ public final class EntityCache<T> {
return new Sheet<>(total, rs);
}
public void insert(T value) {
if (value == null) return;
public int insert(T value) {
if (value == null) return 0;
final T rs = newReproduce.apply(this.creator.create(), value); //确保同一主键值的map与list中的对象必须共用。
T old = this.map.put(this.primary.get(rs), rs);
if (old == null) {
this.list.add(rs);
return 1;
} else {
logger.log(Level.WARNING, this.type + " cache repeat insert data: " + value);
return 0;
}
}

View File

@@ -25,9 +25,9 @@ import static org.redkale.source.Sources.*;
*
* @author zhangjx
*/
public class JDBCPoolSource {
public class PoolJdbcSource {
private static final Map<String, AbstractMap.SimpleEntry<WatchService, List<WeakReference<JDBCPoolSource>>>> maps = new HashMap<>();
private static final Map<String, AbstractMap.SimpleEntry<WatchService, List<WeakReference<PoolJdbcSource>>>> maps = new HashMap<>();
private final AtomicLong usingCounter = new AtomicLong();
@@ -43,7 +43,7 @@ public class JDBCPoolSource {
private final ConnectionEventListener listener;
private final DataDefaultSource dataSource;
private final DataJdbcSource dataSource;
private final String stype; // "" "read" "write"
@@ -57,7 +57,7 @@ public class JDBCPoolSource {
final Properties props;
public JDBCPoolSource(DataDefaultSource source, String stype, Properties prop) {
public PoolJdbcSource(DataJdbcSource source, String stype, Properties prop) {
this.dataSource = source;
this.stype = stype;
this.props = prop;
@@ -190,13 +190,13 @@ public class JDBCPoolSource {
final File f = new File(file);
if (!f.isFile() || !f.canRead()) return;
synchronized (maps) {
AbstractMap.SimpleEntry<WatchService, List<WeakReference<JDBCPoolSource>>> entry = maps.get(file);
AbstractMap.SimpleEntry<WatchService, List<WeakReference<PoolJdbcSource>>> entry = maps.get(file);
if (entry != null) {
entry.getValue().add(new WeakReference<>(this));
return;
}
final WatchService watcher = f.toPath().getFileSystem().newWatchService();
final List<WeakReference<JDBCPoolSource>> list = new CopyOnWriteArrayList<>();
final List<WeakReference<PoolJdbcSource>> list = new CopyOnWriteArrayList<>();
Thread watchThread = new Thread() {
@Override
@@ -214,8 +214,8 @@ public class JDBCPoolSource {
key.pollEvents().stream().forEach((event) -> {
if (event.kind() != ENTRY_MODIFY) return;
if (!((Path) event.context()).toFile().getName().equals(f.getName())) return;
for (WeakReference<JDBCPoolSource> ref : list) {
JDBCPoolSource pool = ref.get();
for (WeakReference<PoolJdbcSource> ref : list) {
PoolJdbcSource pool = ref.get();
if (pool == null) continue;
try {
Properties property = m.get(pool.dataSource.name);

View File

@@ -46,7 +46,7 @@ public final class Sources {
public static DataSource createDataSource(final String unitName) throws IOException {
return createDataSource(unitName, System.getProperty(DATASOURCE_CONFPATH) == null
? DataDefaultSource.class.getResource("/META-INF/persistence.xml")
? DataJdbcSource.class.getResource("/META-INF/persistence.xml")
: new File(System.getProperty(DATASOURCE_CONFPATH)).toURI().toURL());
}
@@ -82,8 +82,8 @@ public final class Sources {
}
}
if (readprop == null) throw new IOException("Cannot find (resource.name = '" + unitName + "') DataSource");
String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataDefaultSource.class.getName());
if (DataDefaultSource.class.getName().equals(impl)) return new DataDefaultSource(unitName, readprop, writeprop);
String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataJdbcSource.class.getName());
if (DataJdbcSource.class.getName().equals(impl)) return new DataJdbcSource(unitName, readprop, writeprop);
try {
Class ds = Class.forName(impl);
for (Constructor d : ds.getConstructors()) {