diff --git a/src/org/redkale/source/DataMemorySource.java b/src/org/redkale/source/DataMemorySource.java new file mode 100644 index 000000000..9e24f95c3 --- /dev/null +++ b/src/org/redkale/source/DataMemorySource.java @@ -0,0 +1,141 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.source; + +import java.io.Serializable; +import java.net.URL; +import java.sql.ResultSet; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Consumer; +import org.redkale.service.Local; +import org.redkale.util.*; + +/** + * + * + * @author zhangjx + */ +/** + * DataSource的Memory实现类
+ * 注意: javax.persistence.jdbc.url 需要指定为 memory:source + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +@Local +@AutoLoad(false) +@SuppressWarnings("unchecked") +@ResourceType(DataSource.class) +public class DataMemorySource extends DataSqlSource { + + public DataMemorySource(String unitName, URL persistxml, Properties readprop, Properties writeprop) { + super(unitName, persistxml, readprop, writeprop); + this.cacheForbidden = false; + } + + @Local + @Override + public String getType() { + return "memory"; + } + + @Override + protected boolean isOnlyCache(EntityInfo info) { + return true; + } + + @Local + @Override + public int directExecute(String sql) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Local + @Override + public int[] directExecute(String... sqls) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Local + @Override + public void directQuery(String sql, Consumer consumer) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + protected boolean isAsync() { + return true; + } + + @Override + protected String prepareParamSign(int index) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + protected PoolSource createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop) { + return null; + } + + @Override + protected CompletableFuture insertDB(EntityInfo info, T... values) { + return CompletableFuture.completedFuture(0); + } + + @Override + protected CompletableFuture deleteDB(EntityInfo info, Flipper flipper, String sql) { + return CompletableFuture.completedFuture(0); + } + + @Override + protected CompletableFuture updateDB(EntityInfo info, T... values) { + return CompletableFuture.completedFuture(0); + } + + @Override + protected CompletableFuture updateDB(EntityInfo info, Flipper flipper, String sql, boolean prepared, Object... params) { + return CompletableFuture.completedFuture(0); + } + + @Override + protected CompletableFuture> getNumberMapDB(EntityInfo info, String sql, FilterFuncColumn... columns) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture getNumberResultDB(EntityInfo info, String sql, Number defVal, String column) { + return CompletableFuture.completedFuture(defVal); + } + + @Override + protected CompletableFuture> queryColumnMapDB(EntityInfo info, String sql, String keyColumn) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture findDB(EntityInfo info, String sql, boolean onlypk, SelectColumn selects) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture findColumnDB(EntityInfo info, String sql, boolean onlypk, String column, Serializable defValue) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture existsDB(EntityInfo info, String sql, boolean onlypk) { + return CompletableFuture.completedFuture(false); + } + + @Override + protected CompletableFuture> querySheetDB(EntityInfo info, boolean needtotal, SelectColumn selects, Flipper flipper, FilterNode node) { + return CompletableFuture.completedFuture(new Sheet<>()); + } + +} diff --git a/src/org/redkale/source/DataSources.java b/src/org/redkale/source/DataSources.java index 30a9eb3c0..8b15065a4 100644 --- a/src/org/redkale/source/DataSources.java +++ b/src/org/redkale/source/DataSources.java @@ -56,11 +56,11 @@ public final class DataSources { private DataSources() { } - public static DataSource createDataSource(final String unitName, Properties prop) throws IOException { + public static DataSource createDataSource2(final String unitName, Properties prop) throws IOException { return new DataJdbcSource(unitName, null, prop, prop); } - public static DataSource createDataSource(final String unitName, Properties readprop, Properties writeprop) throws IOException { + public static DataSource createDataSource2(final String unitName, Properties readprop, Properties writeprop) throws IOException { return new DataJdbcSource(unitName, null, readprop, writeprop); } @@ -103,6 +103,8 @@ public final class DataSources { } if (readprop == null) throw new IOException("Cannot find (resource.name = '" + unitName + "') DataSource"); if (writeprop == null) writeprop = readprop; + if (readprop.getProperty(JDBC_URL, "").startsWith("memory:source")) return new DataMemorySource(unitName, persistxml, readprop, writeprop); + String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataJdbcSource.class.getName()); if (DataJdbcSource.class.getName().equals(impl)) { try { diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index c12da3936..80cce0e66 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -67,6 +67,8 @@ public abstract class DataSqlSource extends AbstractService implement @SuppressWarnings({"OverridableMethodCallInConstructor", "LeakingThisInConstructor"}) public DataSqlSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) { + if (readprop == null) readprop = new Properties(); + if (writeprop == null) writeprop = readprop; final AtomicInteger counter = new AtomicInteger(); this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Runtime.getRuntime().availableProcessors() * 16)); int maxconns = Math.max(8, Integer.decode(readprop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Math.min(1000, Runtime.getRuntime().availableProcessors() * 200)))); @@ -205,8 +207,8 @@ public abstract class DataSqlSource extends AbstractService implement @Local @Override public void close() throws Exception { - readPool.close(); - writePool.close(); + if (readPool != null) readPool.close(); + if (writePool != null) writePool.close(); } @Local @@ -226,7 +228,11 @@ public abstract class DataSqlSource extends AbstractService implement } protected EntityInfo loadEntityInfo(Class clazz) { - return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader); + return EntityInfo.load(clazz, this.cacheForbidden, this.readPool == null ? null : this.readPool.props, this, fullloader); + } + + protected boolean isOnlyCache(EntityInfo info) { + return info.isVirtualEntity(); } /** @@ -282,7 +288,7 @@ public abstract class DataSqlSource extends AbstractService implement info.createPrimaryValue(value); } } - if (info.isVirtualEntity()) return insertCache(info, values); + if (isOnlyCache(info)) return insertCache(info, values); return insertDB(info, values).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -303,7 +309,7 @@ public abstract class DataSqlSource extends AbstractService implement info.createPrimaryValue(value); } } - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> insertCache(info, values), getExecutor()); } if (isAsync()) return insertDB(info, values).whenComplete((rs, t) -> { @@ -390,7 +396,7 @@ public abstract class DataSqlSource extends AbstractService implement public int delete(Class clazz, Serializable... ids) { if (ids.length == 0) return -1; final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return deleteCache(info, -1, ids); + if (isOnlyCache(info)) return deleteCache(info, -1, ids); return deleteCompose(info, ids).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -404,7 +410,7 @@ public abstract class DataSqlSource extends AbstractService implement public CompletableFuture deleteAsync(final Class clazz, final Serializable... ids) { if (ids.length == 0) return CompletableFuture.completedFuture(-1); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, ids), getExecutor()); } if (isAsync()) return deleteCompose(info, ids).whenComplete((rs, t) -> { @@ -436,7 +442,7 @@ public abstract class DataSqlSource extends AbstractService implement @Override public int delete(Class clazz, final Flipper flipper, FilterNode node) { final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return deleteCache(info, -1, flipper, node); + if (isOnlyCache(info)) return deleteCache(info, -1, flipper, node); return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -449,7 +455,7 @@ public abstract class DataSqlSource extends AbstractService implement @Override public CompletableFuture deleteAsync(final Class clazz, final Flipper flipper, FilterNode node) { final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor()); } if (isAsync()) return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> { @@ -561,7 +567,7 @@ public abstract class DataSqlSource extends AbstractService implement checkEntity("update", false, values); final Class clazz = (Class) values[0].getClass(); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return updateCache(info, -1, values); + if (isOnlyCache(info)) return updateCache(info, -1, values); return updateDB(info, values).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -578,7 +584,7 @@ public abstract class DataSqlSource extends AbstractService implement if (future != null) return future; final Class clazz = (Class) values[0].getClass(); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return CompletableFuture.supplyAsync(() -> updateCache(info, -1, values), getExecutor()); + if (isOnlyCache(info)) return CompletableFuture.supplyAsync(() -> updateCache(info, -1, values), getExecutor()); if (isAsync()) return updateDB(info, values).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -609,7 +615,7 @@ public abstract class DataSqlSource extends AbstractService implement @Override public int updateColumn(Class clazz, Serializable id, String column, Serializable value) { final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return updateCache(info, -1, id, column, value); + if (isOnlyCache(info)) return updateCache(info, -1, id, column, value); return updateColumnCompose(info, id, column, value).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -622,7 +628,7 @@ public abstract class DataSqlSource extends AbstractService implement @Override public CompletableFuture updateColumnAsync(final Class clazz, final Serializable id, final String column, final Serializable value) { final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, column, value), getExecutor()); } if (isAsync()) return updateColumnCompose(info, id, column, value).whenComplete((rs, t) -> { @@ -666,7 +672,7 @@ public abstract class DataSqlSource extends AbstractService implement @Override public int updateColumn(Class clazz, String column, Serializable value, FilterNode node) { final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return updateCache(info, -1, column, value, node); + if (isOnlyCache(info)) return updateCache(info, -1, column, value, node); return DataSqlSource.this.updateColumnCompose(info, column, value, node).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -679,7 +685,7 @@ public abstract class DataSqlSource extends AbstractService implement @Override public CompletableFuture updateColumnAsync(final Class clazz, final String column, final Serializable value, final FilterNode node) { final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> updateCache(info, -1, column, value, node), getExecutor()); } if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, column, value, node).whenComplete((rs, t) -> { @@ -740,7 +746,7 @@ public abstract class DataSqlSource extends AbstractService implement public int updateColumn(final Class clazz, final Serializable id, final ColumnValue... values) { if (values == null || values.length < 1) return -1; final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return updateCache(info, -1, id, values); + if (isOnlyCache(info)) return updateCache(info, -1, id, values); return DataSqlSource.this.updateColumnCompose(info, id, values).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -754,7 +760,7 @@ public abstract class DataSqlSource extends AbstractService implement public CompletableFuture updateColumnAsync(final Class clazz, final Serializable id, final ColumnValue... values) { if (values == null || values.length < 1) return CompletableFuture.completedFuture(-1); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, values), getExecutor()); } if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, id, values).whenComplete((rs, t) -> { @@ -819,7 +825,7 @@ public abstract class DataSqlSource extends AbstractService implement public int updateColumn(final Class clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) { if (values == null || values.length < 1) return -1; final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return updateCache(info, -1, node, flipper, values); + if (isOnlyCache(info)) return updateCache(info, -1, node, flipper, values); return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -833,7 +839,7 @@ public abstract class DataSqlSource extends AbstractService implement public CompletableFuture updateColumnAsync(final Class clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) { if (values == null || values.length < 1) return CompletableFuture.completedFuture(-1); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> updateCache(info, -1, node, flipper, values), getExecutor()); } if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> { @@ -913,7 +919,7 @@ public abstract class DataSqlSource extends AbstractService implement if (bean == null || selects == null) return -1; Class clazz = (Class) bean.getClass(); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return updateCache(info, -1, false, bean, null, selects); + if (isOnlyCache(info)) return updateCache(info, -1, false, bean, null, selects); return DataSqlSource.this.updateColumnCompose(info, false, bean, null, selects).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -928,7 +934,7 @@ public abstract class DataSqlSource extends AbstractService implement if (bean == null || selects == null) return CompletableFuture.completedFuture(-1); Class clazz = (Class) bean.getClass(); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> updateCache(info, -1, false, bean, null, selects), getExecutor()); } if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, false, bean, null, selects).whenComplete((rs, t) -> { @@ -952,7 +958,7 @@ public abstract class DataSqlSource extends AbstractService implement if (bean == null || node == null || selects == null) return -1; Class clazz = (Class) bean.getClass(); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) return updateCache(info, -1, true, bean, node, selects); + if (isOnlyCache(info)) return updateCache(info, -1, true, bean, node, selects); return DataSqlSource.this.updateColumnCompose(info, true, bean, node, selects).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); @@ -967,7 +973,7 @@ public abstract class DataSqlSource extends AbstractService implement if (bean == null || node == null || selects == null) return CompletableFuture.completedFuture(-1); Class clazz = (Class) bean.getClass(); final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { + if (isOnlyCache(info)) { return CompletableFuture.supplyAsync(() -> updateCache(info, -1, true, bean, node, selects), getExecutor()); } if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, true, bean, node, selects).whenComplete((rs, t) -> { @@ -1158,7 +1164,7 @@ public abstract class DataSqlSource extends AbstractService implement public Map getNumberMap(final Class entityClass, final FilterNode node, final FilterFuncColumn... columns) { final EntityInfo info = loadEntityInfo(entityClass); final EntityCache cache = info.getCache(); - if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) { + if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) { final Map map = new HashMap<>(); if (node == null || node.isCacheUseable(this)) { for (FilterFuncColumn ffc : columns) { @@ -1176,7 +1182,7 @@ public abstract class DataSqlSource extends AbstractService implement public CompletableFuture> getNumberMapAsync(final Class entityClass, final FilterNode node, final FilterFuncColumn... columns) { final EntityInfo info = loadEntityInfo(entityClass); final EntityCache cache = info.getCache(); - if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) { + if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) { final Map map = new HashMap<>(); if (node == null || node.isCacheUseable(this)) { for (FilterFuncColumn ffc : columns) { @@ -1264,7 +1270,7 @@ public abstract class DataSqlSource extends AbstractService implement public Number getNumberResult(final Class entityClass, final FilterFunc func, final Number defVal, final String column, final FilterNode node) { final EntityInfo info = loadEntityInfo(entityClass); final EntityCache cache = info.getCache(); - if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) { + if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) { if (node == null || node.isCacheUseable(this)) { return cache.getNumberResult(func, defVal, column, node); } @@ -1276,7 +1282,7 @@ public abstract class DataSqlSource extends AbstractService implement public CompletableFuture getNumberResultAsync(final Class entityClass, final FilterFunc func, final Number defVal, final String column, final FilterNode node) { final EntityInfo info = loadEntityInfo(entityClass); final EntityCache cache = info.getCache(); - if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) { + if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) { if (node == null || node.isCacheUseable(this)) { return CompletableFuture.completedFuture(cache.getNumberResult(func, defVal, column, node)); } @@ -1321,7 +1327,7 @@ public abstract class DataSqlSource extends AbstractService implement public Map queryColumnMap(final Class entityClass, final String keyColumn, final FilterFunc func, final String funcColumn, FilterNode node) { final EntityInfo info = loadEntityInfo(entityClass); final EntityCache cache = info.getCache(); - if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) { + if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) { if (node == null || node.isCacheUseable(this)) { return cache.queryColumnMap(keyColumn, func, funcColumn, node); } @@ -1333,7 +1339,7 @@ public abstract class DataSqlSource extends AbstractService implement public CompletableFuture> queryColumnMapAsync(final Class entityClass, final String keyColumn, final FilterFunc func, final String funcColumn, FilterNode node) { final EntityInfo info = loadEntityInfo(entityClass); final EntityCache cache = info.getCache(); - if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) { + if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) { if (node == null || node.isCacheUseable(this)) { return CompletableFuture.completedFuture(cache.queryColumnMap(keyColumn, func, funcColumn, node)); } diff --git a/src/org/redkale/source/EntityInfo.java b/src/org/redkale/source/EntityInfo.java index cb3850c5d..dae98593e 100644 --- a/src/org/redkale/source/EntityInfo.java +++ b/src/org/redkale/source/EntityInfo.java @@ -235,11 +235,12 @@ public final class EntityInfo { } //--------------------------------------------- Table t = type.getAnnotation(Table.class); - if (type.getAnnotation(VirtualEntity.class) != null) { + if (type.getAnnotation(VirtualEntity.class) != null || "memory".equalsIgnoreCase(source.getType())) { this.table = null; BiFunction loader = null; try { - loader = type.getAnnotation(VirtualEntity.class).loader().getDeclaredConstructor().newInstance(); + VirtualEntity ve = type.getAnnotation(VirtualEntity.class); + if (ve != null) loader = ve.loader().getDeclaredConstructor().newInstance(); } catch (Exception e) { logger.log(Level.SEVERE, type + " init @VirtualEntity.loader error", e); }