This commit is contained in:
Redkale
2018-05-04 20:23:08 +08:00
parent 7166dc0ec0
commit 508c7c4c0a
2 changed files with 513 additions and 115 deletions

View File

@@ -7,9 +7,11 @@ package org.redkale.source;
import java.io.Serializable; import java.io.Serializable;
import java.net.URL; import java.net.URL;
import java.sql.Connection; import java.sql.*;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.*;
import java.util.logging.Level;
import org.redkale.service.Local; import org.redkale.service.Local;
import org.redkale.util.*; import org.redkale.util.*;
@@ -48,56 +50,452 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
@Override @Override
protected <T> CompletableFuture<Void> insertDB(EntityInfo<T> info, T... values) { protected <T> CompletableFuture<Void> insertDB(EntityInfo<T> info, T... values) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Connection conn = null;
try {
conn = writePool.poll();
final String sql = info.getInsertPrepareSQL(values[0]);
final Class primaryType = info.getPrimary().type();
final Attribute primary = info.getPrimary();
Attribute<T, Serializable>[] attrs = info.insertAttributes;
conn.setReadOnly(false);
PreparedStatement prestmt = createInsertPreparedStatement(conn, sql, info, values);
try {
prestmt.executeBatch();
} catch (SQLException se) {
if (info.tableStrategy == null || !info.tablenotexistSqlstates.contains(';' + se.getSQLState() + ';')) throw se;
synchronized (info.tables) {
final String oldTable = info.table;
final String newTable = info.getTable(values[0]);
if (!info.tables.contains(newTable)) {
try {
Statement st = conn.createStatement();
st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable));
st.close();
info.tables.add(newTable);
} catch (SQLException sqle) { //多进程并发时可能会出现重复建表
if (newTable.indexOf('.') > 0 && info.tablenotexistSqlstates.contains(';' + se.getSQLState() + ';')) {
Statement st;
try {
st = conn.createStatement();
st.execute("CREATE DATABASE " + newTable.substring(0, newTable.indexOf('.')));
st.close();
} catch (SQLException sqle1) {
logger.log(Level.SEVERE, "create database(" + newTable.substring(0, newTable.indexOf('.')) + ") error", sqle1);
}
try {
st = conn.createStatement();
st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable));
st.close();
info.tables.add(newTable);
} catch (SQLException sqle2) {
logger.log(Level.SEVERE, "create table2(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle2);
}
} else {
logger.log(Level.SEVERE, "create table(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle);
}
}
}
}
prestmt.close();
prestmt = createInsertPreparedStatement(conn, sql, info, values);
prestmt.executeBatch();
}
if (info.autoGenerated) { //由数据库自动生成主键值
ResultSet set = prestmt.getGeneratedKeys();
int i = -1;
while (set.next()) {
if (primaryType == int.class) {
primary.set(values[++i], set.getInt(1));
} else if (primaryType == long.class) {
primary.set(values[++i], set.getLong(1));
} else {
primary.set(values[++i], set.getObject(1));
}
}
set.close();
}
prestmt.close();
//------------------------------------------------------------
if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息
char[] sqlchars = sql.toCharArray();
for (final T value : values) {
//-----------------------------
StringBuilder sb = new StringBuilder(128);
int i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = attrs[i++].get(value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(FilterNode.formatToString(obj));
}
} else {
sb.append(ch);
}
}
logger.finest(info.getType().getSimpleName() + " insert sql=" + sb.toString().replaceAll("(\r|\n)", "\\n"));
}
} //打印结束
return CompletableFuture.completedFuture(null);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
} }
@Override protected <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, T... values) { final EntityInfo<T> info, T... values) throws SQLException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Attribute<T, Serializable>[] attrs = info.insertAttributes;
} final PreparedStatement prestmt = info.autoGenerated ? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
@Override for (final T value : values) {
protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, Flipper flipper, String sql, boolean prepared, Object... params) { int i = 0;
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. if (info.autouuid) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
Serializable val = attr.get(value);
if (val instanceof byte[]) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++i, blob);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++i, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++i, ((AtomicLong) val).get());
} else {
prestmt.setObject(++i, val);
}
}
prestmt.addBatch();
}
return prestmt;
} }
@Override @Override
protected <T> CompletableFuture<Integer> deleteDB(EntityInfo<T> info, Flipper flipper, String sql) { protected <T> CompletableFuture<Integer> deleteDB(EntityInfo<T> info, Flipper flipper, String sql) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Connection conn = null;
try {
conn = writePool.poll();
conn.setReadOnly(false);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
final Statement stmt = conn.createStatement();
int c = stmt.executeUpdate(sql);
stmt.close();
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
} }
@Override @Override
protected <T> CompletableFuture<T> findDB(EntityInfo<T> info, String sql, boolean onlypk, SelectColumn selects) { protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, T... values) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Connection conn = null;
try {
conn = writePool.poll();
conn.setReadOnly(false);
final String updateSQL = info.getUpdatePrepareSQL(values[0]);
final PreparedStatement prestmt = conn.prepareStatement(updateSQL);
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final boolean debugfinest = info.isLoggable(logger, Level.FINEST);
char[] sqlchars = debugfinest ? updateSQL.toCharArray() : null;
final Attribute<T, Serializable> primary = info.getPrimary();
for (final T value : values) {
int k = 0;
for (Attribute<T, Serializable> attr : attrs) {
Serializable val = attr.get(value);
if (val instanceof byte[]) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++k, blob);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++k, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++k, ((AtomicLong) val).get());
} else {
prestmt.setObject(++k, val);
}
}
prestmt.setObject(++k, primary.get(value));
prestmt.addBatch();//------------------------------------------------------------
if (debugfinest) { //打印调试信息
//-----------------------------
int i = 0;
StringBuilder sb = new StringBuilder(128);
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = i == attrs.length ? primary.get(value) : attrs[i++].get(value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(FilterNode.formatToString(obj));
}
} else {
sb.append(ch);
}
}
logger.finest(info.getType().getSimpleName() + " update sql=" + sb.toString().replaceAll("(\r|\n)", "\\n"));
} //打印结束
}
int[] pc = prestmt.executeBatch();
int c = 0;
for (int p : pc) {
if (p >= 0) c += p;
}
prestmt.close();
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
} }
@Override @Override
protected <T> CompletableFuture<Serializable> findColumnDB(EntityInfo<T> info, String sql, boolean onlypk, String column, Serializable defValue) { protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, Flipper flipper, String sql, boolean prepared, Object... params) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Connection conn = null;
} try {
conn = writePool.poll();
@Override conn.setReadOnly(false);
protected <T> CompletableFuture<Number> getNumberResultDB(EntityInfo<T> info, String sql, Number defVal, String column) { if (prepared) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. final PreparedStatement prestmt = conn.prepareStatement(sql);
int index = 0;
for (Object param : params) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) param);
prestmt.setBlob(++index, blob);
}
int c = prestmt.executeUpdate();
prestmt.close();
return CompletableFuture.completedFuture(c);
} else {
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
final Statement stmt = conn.createStatement();
int c = stmt.executeUpdate(sql);
stmt.close();
return CompletableFuture.completedFuture(c);
}
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
} }
@Override @Override
protected <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(EntityInfo<T> info, String sql, FilterFuncColumn... columns) { protected <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(EntityInfo<T> info, String sql, FilterFuncColumn... columns) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final Statement stmt = conn.createStatement();
ResultSet set = stmt.executeQuery(sql);
final Map map = new HashMap<>();
if (set.next()) {
int index = 0;
for (FilterFuncColumn ffc : columns) {
for (String col : ffc.cols()) {
Object o = set.getObject(++index);
Number rs = ffc.defvalue;
if (o != null) rs = (Number) o;
map.put(ffc.col(col), rs);
}
}
}
set.close();
stmt.close();
return CompletableFuture.completedFuture(map);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Number> getNumberResultDB(EntityInfo<T> info, String sql, Number defVal, String column) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final Statement stmt = conn.createStatement();
Number rs = defVal;
ResultSet set = stmt.executeQuery(sql);
if (set.next()) {
Object o = set.getObject(1);
if (o != null) rs = (Number) o;
}
set.close();
stmt.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
} }
@Override @Override
protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(EntityInfo<T> info, String sql, String keyColumn) { protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(EntityInfo<T> info, String sql, String keyColumn) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final Statement stmt = conn.createStatement();
Map<K, N> rs = new LinkedHashMap<>();
ResultSet set = stmt.executeQuery(sql);
ResultSetMetaData rsd = set.getMetaData();
boolean smallint = rsd.getColumnType(1) == Types.SMALLINT;
while (set.next()) {
rs.put((K) (smallint ? set.getShort(1) : set.getObject(1)), (N) set.getObject(2));
}
set.close();
stmt.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
}
@Override
protected <T> CompletableFuture<T> findDB(EntityInfo<T> info, String sql, boolean onlypk, SelectColumn selects) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(1);
final ResultSet set = ps.executeQuery();
T rs = set.next() ? info.getValue(selects, set) : null;
set.close();
ps.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
if (info.tableStrategy != null && info.tablenotexistSqlstates.contains(';' + e.getSQLState() + ';')) return CompletableFuture.completedFuture(null);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Serializable> findColumnDB(EntityInfo<T> info, String sql, boolean onlypk, String column, Serializable defValue) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final Attribute<T, Serializable> attr = info.getAttribute(column);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(1);
final ResultSet set = ps.executeQuery();
Serializable val = defValue;
if (set.next()) {
if (attr.type() == byte[].class) {
Blob blob = set.getBlob(1);
if (blob != null) val = blob.getBytes(1, (int) blob.length());
} else {
val = (Serializable) set.getObject(1);
}
}
set.close();
ps.close();
return CompletableFuture.completedFuture(val == null ? defValue : val);
} catch (SQLException e) {
if (info.tableStrategy != null && info.tablenotexistSqlstates.contains(';' + e.getSQLState() + ';')) return CompletableFuture.completedFuture(defValue);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
} }
@Override @Override
protected <T> CompletableFuture<Boolean> existsDB(EntityInfo<T> info, String sql, boolean onlypk) { protected <T> CompletableFuture<Boolean> existsDB(EntityInfo<T> info, String sql, boolean onlypk) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
final ResultSet set = ps.executeQuery();
boolean rs = set.next() ? (set.getInt(1) > 0) : false;
set.close();
ps.close();
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql);
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
if (info.tableStrategy != null && info.tablenotexistSqlstates.contains(';' + e.getSQLState() + ';')) return CompletableFuture.completedFuture(false);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
} }
@Override @Override
protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, boolean needtotal, SelectColumn selects, Flipper flipper, FilterNode node) { protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, boolean needtotal, SelectColumn selects, Flipper flipper, FilterNode node) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final SelectColumn sels = selects;
final List<T> list = new ArrayList();
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
final String sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + info.getTable(node) + " a" + (join == null ? "" : join)
+ ((where == null || where.length() == 0) ? "" : (" WHERE " + where)) + info.createSQLOrderby(flipper);
if (info.isLoggable(logger, Level.FINEST)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + sql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getOffset() + "," + flipper.getLimit())));
}
conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
if (flipper != null && flipper.getLimit() > 0) ps.setFetchSize(flipper.getLimit());
final ResultSet set = ps.executeQuery();
if (flipper != null && flipper.getOffset() > 0) set.absolute(flipper.getOffset());
final int limit = flipper == null || flipper.getLimit() < 1 ? Integer.MAX_VALUE : flipper.getLimit();
int i = 0;
while (set.next()) {
i++;
list.add(info.getValue(sels, set));
if (limit <= i) break;
}
long total = list.size();
if (needtotal && flipper != null) {
set.last();
total = set.getRow();
}
set.close();
ps.close();
return CompletableFuture.completedFuture(new Sheet<>(total, list));
} catch (SQLException e) {
if (info.tableStrategy != null && info.tablenotexistSqlstates.contains(';' + e.getSQLState() + ';')) return CompletableFuture.completedFuture(new Sheet<>());
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.closeConnection(conn);
}
} }
} }

View File

@@ -63,7 +63,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (t != null) logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t); if (t != null) logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t);
}; };
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> ((Sheet) querySheet(false, false, t, null, null, (FilterNode) null).join()).list(true); protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> ((Sheet) querySheetCompose(false, false, t, null, null, (FilterNode) null).join()).list(true);
@SuppressWarnings({"OverridableMethodCallInConstructor", "LeakingThisInConstructor"}) @SuppressWarnings({"OverridableMethodCallInConstructor", "LeakingThisInConstructor"})
public DataSqlSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) { public DataSqlSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
@@ -115,14 +115,23 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
//插入纪录 //插入纪录
protected abstract <T> CompletableFuture<Void> insertDB(final EntityInfo<T> info, T... values); protected abstract <T> CompletableFuture<Void> insertDB(final EntityInfo<T> info, T... values);
//删除记录
protected abstract <T> CompletableFuture<Integer> deleteDB(final EntityInfo<T> info, Flipper flipper, final String sql);
//更新纪录 //更新纪录
protected abstract <T> CompletableFuture<Integer> updateDB(final EntityInfo<T> info, T... values); protected abstract <T> CompletableFuture<Integer> updateDB(final EntityInfo<T> info, T... values);
//更新纪录 //更新纪录
protected abstract <T> CompletableFuture<Integer> updateDB(final EntityInfo<T> info, Flipper flipper, final String sql, final boolean prepared, Object... params); protected abstract <T> CompletableFuture<Integer> updateDB(final EntityInfo<T> info, Flipper flipper, final String sql, final boolean prepared, Object... params);
//删除记录 //查询Number Map数据
protected abstract <T> CompletableFuture<Integer> deleteDB(final EntityInfo<T> info, Flipper flipper, final String sql); protected abstract <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(final EntityInfo<T> info, final String sql, final FilterFuncColumn... columns);
//查询Number数据
protected abstract <T> CompletableFuture<Number> getNumberResultDB(final EntityInfo<T> info, final String sql, final Number defVal, final String column);
//查询Map数据
protected abstract <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(final EntityInfo<T> info, final String sql, final String keyColumn);
//查询单条记录 //查询单条记录
protected abstract <T> CompletableFuture<T> findDB(final EntityInfo<T> info, final String sql, final boolean onlypk, final SelectColumn selects); protected abstract <T> CompletableFuture<T> findDB(final EntityInfo<T> info, final String sql, final boolean onlypk, final SelectColumn selects);
@@ -130,15 +139,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
//查询单条记录的单个字段 //查询单条记录的单个字段
protected abstract <T> CompletableFuture<Serializable> findColumnDB(final EntityInfo<T> info, final String sql, final boolean onlypk, final String column, final Serializable defValue); protected abstract <T> CompletableFuture<Serializable> findColumnDB(final EntityInfo<T> info, final String sql, final boolean onlypk, final String column, final Serializable defValue);
//查询Number数据
protected abstract <T> CompletableFuture<Number> getNumberResultDB(final EntityInfo<T> info, final String sql, final Number defVal, final String column);
//查询Number Map数据
protected abstract <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(final EntityInfo<T> info, final String sql, final FilterFuncColumn... columns);
//查询Map数据
protected abstract <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(final EntityInfo<T> info, final String sql, final String keyColumn);
//判断记录是否存在 //判断记录是否存在
protected abstract <T> CompletableFuture<Boolean> existsDB(final EntityInfo<T> info, final String sql, final boolean onlypk); protected abstract <T> CompletableFuture<Boolean> existsDB(final EntityInfo<T> info, final String sql, final boolean onlypk);
@@ -307,7 +307,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return c; return c;
} }
//----------------------------- delete ----------------------------- //----------------------------- deleteCompose -----------------------------
/** /**
* 删除对象, 必须是Entity对象 * 删除对象, 必须是Entity对象
* *
@@ -352,7 +352,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (ids.length == 0) return -1; if (ids.length == 0) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) return deleteCache(info, -1, ids); if (info.isVirtualEntity()) return deleteCache(info, -1, ids);
return delete(info, ids).whenComplete((rs, t) -> { return deleteCompose(info, ids).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -368,14 +368,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (info.isVirtualEntity()) { if (info.isVirtualEntity()) {
return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, ids), getExecutor()); return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, ids), getExecutor());
} }
if (isAsync()) return delete(info, ids).whenComplete((rs, t) -> { if (isAsync()) return deleteCompose(info, ids).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
deleteCache(info, rs, ids); deleteCache(info, rs, ids);
} }
}); });
return CompletableFuture.supplyAsync(() -> delete(info, ids).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> deleteCompose(info, ids).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -398,7 +398,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int delete(Class<T> clazz, final Flipper flipper, FilterNode node) { public <T> int delete(Class<T> clazz, final Flipper flipper, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) return deleteCache(info, -1, flipper, node); if (info.isVirtualEntity()) return deleteCache(info, -1, flipper, node);
return delete(info, flipper, node).whenComplete((rs, t) -> { return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -413,14 +413,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (info.isVirtualEntity()) { if (info.isVirtualEntity()) {
return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor()); return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor());
} }
if (isAsync()) return delete(info, flipper, node).whenComplete((rs, t) -> { if (isAsync()) return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
deleteCache(info, rs, flipper, node); deleteCache(info, rs, flipper, node);
} }
}); });
return CompletableFuture.supplyAsync(() -> delete(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> DataSqlSource.this.deleteCompose(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -429,7 +429,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}); });
} }
protected <T> CompletableFuture<Integer> delete(final EntityInfo<T> info, final Serializable... ids) { protected <T> CompletableFuture<Integer> deleteCompose(final EntityInfo<T> info, final Serializable... ids) {
if (ids.length == 1) { if (ids.length == 1) {
String sql = "DELETE FROM " + info.getTable(ids[0]) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(ids[0]); String sql = "DELETE FROM " + info.getTable(ids[0]) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(ids[0]);
return deleteDB(info, null, sql); return deleteDB(info, null, sql);
@@ -444,7 +444,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return deleteDB(info, null, sql); return deleteDB(info, null, sql);
} }
protected <T> CompletableFuture<Integer> delete(final EntityInfo<T> info, final Flipper flipper, final FilterNode node) { protected <T> CompletableFuture<Integer> deleteCompose(final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
Map<Class, String> joinTabalis = node.getJoinTabalis(); Map<Class, String> joinTabalis = node.getJoinTabalis();
CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info); CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
CharSequence where = node.createSQLExpress(info, joinTabalis); CharSequence where = node.createSQLExpress(info, joinTabalis);
@@ -571,7 +571,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int updateColumn(Class<T> clazz, Serializable id, String column, Serializable value) { public <T> int updateColumn(Class<T> clazz, Serializable id, String column, Serializable value) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) return updateCache(info, -1, id, column, value); if (info.isVirtualEntity()) return updateCache(info, -1, id, column, value);
return updateColumn(info, id, column, value).whenComplete((rs, t) -> { return updateColumnCompose(info, id, column, value).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -586,14 +586,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (info.isVirtualEntity()) { if (info.isVirtualEntity()) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, column, value), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, column, value), getExecutor());
} }
if (isAsync()) return updateColumn(info, id, column, value).whenComplete((rs, t) -> { if (isAsync()) return updateColumnCompose(info, id, column, value).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, id, column, value); updateCache(info, rs, id, column, value);
} }
}); });
return CompletableFuture.supplyAsync(() -> updateColumn(info, id, column, value).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> updateColumnCompose(info, id, column, value).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -602,7 +602,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}); });
} }
protected <T> CompletableFuture<Integer> updateColumn(final EntityInfo<T> info, Serializable id, String column, final Serializable value) { protected <T> CompletableFuture<Integer> updateColumnCompose(final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
if (value instanceof byte[]) { if (value instanceof byte[]) {
String sql = "UPDATE " + info.getTable(id) + " SET " + info.getSQLColumn(null, column) + " = " + getPrepareParamSign(1) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id); String sql = "UPDATE " + info.getTable(id) + " SET " + info.getSQLColumn(null, column) + " = " + getPrepareParamSign(1) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
return updateDB(info, null, sql, true, value); return updateDB(info, null, sql, true, value);
@@ -628,7 +628,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int updateColumn(Class<T> clazz, String column, Serializable value, FilterNode node) { public <T> int updateColumn(Class<T> clazz, String column, Serializable value, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) return updateCache(info, -1, column, value, node); if (info.isVirtualEntity()) return updateCache(info, -1, column, value, node);
return updateColumn(info, column, value, node).whenComplete((rs, t) -> { return DataSqlSource.this.updateColumnCompose(info, column, value, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -643,14 +643,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (info.isVirtualEntity()) { if (info.isVirtualEntity()) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, column, value, node), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, column, value, node), getExecutor());
} }
if (isAsync()) return updateColumn(info, column, value, node).whenComplete((rs, t) -> { if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, column, value, node).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, column, value, node); updateCache(info, rs, column, value, node);
} }
}); });
return CompletableFuture.supplyAsync(() -> updateColumn(info, column, value, node).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, column, value, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -659,7 +659,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}); });
} }
protected <T> CompletableFuture<Integer> updateColumn(final EntityInfo<T> info, final String column, final Serializable value, final FilterNode node) { protected <T> CompletableFuture<Integer> updateColumnCompose(final EntityInfo<T> info, final String column, final Serializable value, final FilterNode node) {
Map<Class, String> joinTabalis = node.getJoinTabalis(); Map<Class, String> joinTabalis = node.getJoinTabalis();
CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info); CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
CharSequence where = node.createSQLExpress(info, joinTabalis); CharSequence where = node.createSQLExpress(info, joinTabalis);
@@ -701,7 +701,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (values == null || values.length < 1) return -1; if (values == null || values.length < 1) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) return updateCache(info, -1, id, values); if (info.isVirtualEntity()) return updateCache(info, -1, id, values);
return updateColumn(info, id, values).whenComplete((rs, t) -> { return DataSqlSource.this.updateColumnCompose(info, id, values).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -717,14 +717,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (info.isVirtualEntity()) { if (info.isVirtualEntity()) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, values), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, values), getExecutor());
} }
if (isAsync()) return updateColumn(info, id, values).whenComplete((rs, t) -> { if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, id, values).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, id, values); updateCache(info, rs, id, values);
} }
}); });
return CompletableFuture.supplyAsync(() -> updateColumn(info, id, values).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, id, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -733,7 +733,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}); });
} }
protected <T> CompletableFuture<Integer> updateColumn(final EntityInfo<T> info, final Serializable id, final ColumnValue... values) { protected <T> CompletableFuture<Integer> updateColumnCompose(final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
StringBuilder setsql = new StringBuilder(); StringBuilder setsql = new StringBuilder();
List<byte[]> blobs = null; List<byte[]> blobs = null;
int index = 0; int index = 0;
@@ -780,7 +780,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (values == null || values.length < 1) return -1; if (values == null || values.length < 1) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) return updateCache(info, -1, node, flipper, values); if (info.isVirtualEntity()) return updateCache(info, -1, node, flipper, values);
return updateColumn(info, node, flipper, values).whenComplete((rs, t) -> { return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -796,14 +796,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (info.isVirtualEntity()) { if (info.isVirtualEntity()) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, node, flipper, values), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, node, flipper, values), getExecutor());
} }
if (isAsync()) return updateColumn(info, node, flipper, values).whenComplete((rs, t) -> { if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, node, flipper, values); updateCache(info, rs, node, flipper, values);
} }
}); });
return CompletableFuture.supplyAsync(() -> updateColumn(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -812,7 +812,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}); });
} }
protected <T> CompletableFuture<Integer> updateColumn(final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) { protected <T> CompletableFuture<Integer> updateColumnCompose(final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
StringBuilder setsql = new StringBuilder(); StringBuilder setsql = new StringBuilder();
List<byte[]> blobs = null; List<byte[]> blobs = null;
int index = 0; int index = 0;
@@ -873,7 +873,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Class<T> clazz = (Class) bean.getClass(); Class<T> clazz = (Class) bean.getClass();
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) return updateCache(info, -1, false, bean, null, selects); if (info.isVirtualEntity()) return updateCache(info, -1, false, bean, null, selects);
return updateColumns(info, false, bean, null, selects).whenComplete((rs, t) -> { return DataSqlSource.this.updateColumnCompose(info, false, bean, null, selects).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -890,14 +890,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (info.isVirtualEntity()) { if (info.isVirtualEntity()) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, false, bean, null, selects), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, false, bean, null, selects), getExecutor());
} }
if (isAsync()) return updateColumns(info, false, bean, null, selects).whenComplete((rs, t) -> { if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, false, bean, null, selects).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, false, bean, null, selects); updateCache(info, rs, false, bean, null, selects);
} }
}); });
return CompletableFuture.supplyAsync(() -> updateColumns(info, false, bean, null, selects).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, false, bean, null, selects).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -912,7 +912,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Class<T> clazz = (Class) bean.getClass(); Class<T> clazz = (Class) bean.getClass();
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) return updateCache(info, -1, true, bean, node, selects); if (info.isVirtualEntity()) return updateCache(info, -1, true, bean, node, selects);
return updateColumns(info, true, bean, node, selects).whenComplete((rs, t) -> { return DataSqlSource.this.updateColumnCompose(info, true, bean, node, selects).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -929,14 +929,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (info.isVirtualEntity()) { if (info.isVirtualEntity()) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, true, bean, node, selects), getExecutor()); return CompletableFuture.supplyAsync(() -> updateCache(info, -1, true, bean, node, selects), getExecutor());
} }
if (isAsync()) return updateColumns(info, true, bean, node, selects).whenComplete((rs, t) -> { if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, true, bean, node, selects).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
updateCache(info, rs, true, bean, node, selects); updateCache(info, rs, true, bean, node, selects);
} }
}); });
return CompletableFuture.supplyAsync(() -> updateColumns(info, true, bean, node, selects).join(), getExecutor()).whenComplete((rs, t) -> { return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, true, bean, node, selects).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) { if (t != null) {
futureCompleteConsumer.accept(rs, t); futureCompleteConsumer.accept(rs, t);
} else { } else {
@@ -945,7 +945,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}); });
} }
protected <T> CompletableFuture<Integer> updateColumns(final EntityInfo<T> info, final boolean neednode, final T bean, final FilterNode node, final SelectColumn selects) { protected <T> CompletableFuture<Integer> updateColumnCompose(final EntityInfo<T> info, final boolean neednode, final T bean, final FilterNode node, final SelectColumn selects) {
StringBuilder setsql = new StringBuilder(); StringBuilder setsql = new StringBuilder();
List<byte[]> blobs = null; List<byte[]> blobs = null;
int index = 0; int index = 0;
@@ -1084,14 +1084,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
String column = info.getPrimary().field(); String column = info.getPrimary().field();
int c = 0; int c = 0;
for (Serializable id : ids) { for (Serializable id : ids) {
Sheet<T> sheet = querySheet(false, true, clazz, null, FLIPPER_ONE, FilterNode.create(column, id)).join(); Sheet<T> sheet = querySheetCompose(false, true, clazz, null, FLIPPER_ONE, FilterNode.create(column, id)).join();
T value = sheet.isEmpty() ? null : sheet.list().get(0); T value = sheet.isEmpty() ? null : sheet.list().get(0);
if (value != null) c += cache.update(value); if (value != null) c += cache.update(value);
} }
return c; return c;
} }
//------------------------- getNumberMap ------------------------- //------------------------- getNumberMapCompose -------------------------
@Override @Override
public <N extends Number> Map<String, N> getNumberMap(final Class entityClass, final FilterFuncColumn... columns) { public <N extends Number> Map<String, N> getNumberMap(final Class entityClass, final FilterFuncColumn... columns) {
return getNumberMap(entityClass, (FilterNode) null, columns); return getNumberMap(entityClass, (FilterNode) null, columns);
@@ -1127,7 +1127,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return map; return map;
} }
} }
return (Map) getNumberMap(info, node, columns).join(); return (Map) getNumberMapCompose(info, node, columns).join();
} }
@Override @Override
@@ -1145,11 +1145,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return CompletableFuture.completedFuture(map); return CompletableFuture.completedFuture(map);
} }
} }
if (isAsync()) return getNumberMap(info, node, columns); if (isAsync()) return getNumberMapCompose(info, node, columns);
return CompletableFuture.supplyAsync(() -> (Map) getNumberMap(info, node, columns).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> (Map) getNumberMapCompose(info, node, columns).join(), getExecutor());
} }
protected <N extends Number> CompletableFuture<Map<String, N>> getNumberMap(final EntityInfo info, final FilterNode node, final FilterFuncColumn... columns) { protected <N extends Number> CompletableFuture<Map<String, N>> getNumberMapCompose(final EntityInfo info, final FilterNode node, final FilterFuncColumn... columns) {
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis(); final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final Set<String> haset = new HashSet<>(); final Set<String> haset = new HashSet<>();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, haset, info); final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, haset, info);
@@ -1167,7 +1167,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return getNumberMapDB(info, sql, columns); return getNumberMapDB(info, sql, columns);
} }
//------------------------ getNumberResult ----------------------- //------------------------ getNumberResultCompose -----------------------
@Override @Override
public Number getNumberResult(final Class entityClass, final FilterFunc func, final String column) { public Number getNumberResult(final Class entityClass, final FilterFunc func, final String column) {
return getNumberResult(entityClass, func, null, column, (FilterNode) null); return getNumberResult(entityClass, func, null, column, (FilterNode) null);
@@ -1227,7 +1227,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return cache.getNumberResult(func, defVal, column, node); return cache.getNumberResult(func, defVal, column, node);
} }
} }
return getNumberResult(info, entityClass, func, defVal, column, node).join(); return getNumberResultCompose(info, entityClass, func, defVal, column, node).join();
} }
@Override @Override
@@ -1239,11 +1239,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return CompletableFuture.completedFuture(cache.getNumberResult(func, defVal, column, node)); return CompletableFuture.completedFuture(cache.getNumberResult(func, defVal, column, node));
} }
} }
if (isAsync()) return getNumberResult(info, entityClass, func, defVal, column, node); if (isAsync()) return getNumberResultCompose(info, entityClass, func, defVal, column, node);
return CompletableFuture.supplyAsync(() -> getNumberResult(info, entityClass, func, defVal, column, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> getNumberResultCompose(info, entityClass, func, defVal, column, node).join(), getExecutor());
} }
protected CompletableFuture<Number> getNumberResult(final EntityInfo info, final Class entityClass, final FilterFunc func, final Number defVal, final String column, final FilterNode node) { protected CompletableFuture<Number> getNumberResultCompose(final EntityInfo info, final Class entityClass, final FilterFunc func, final Number defVal, final String column, final FilterNode node) {
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis(); final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final Set<String> haset = new HashSet<>(); final Set<String> haset = new HashSet<>();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, haset, info); final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, haset, info);
@@ -1254,7 +1254,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return getNumberResultDB(info, sql, defVal, column); return getNumberResultDB(info, sql, defVal, column);
} }
//------------------------ queryColumnMap ------------------------ //------------------------ queryColumnMapCompose ------------------------
@Override @Override
public <T, K extends Serializable, N extends Number> Map<K, N> queryColumnMap(final Class<T> entityClass, final String keyColumn, final FilterFunc func, final String funcColumn) { public <T, K extends Serializable, N extends Number> Map<K, N> queryColumnMap(final Class<T> entityClass, final String keyColumn, final FilterFunc func, final String funcColumn) {
return queryColumnMap(entityClass, keyColumn, func, funcColumn, (FilterNode) null); return queryColumnMap(entityClass, keyColumn, func, funcColumn, (FilterNode) null);
@@ -1284,7 +1284,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return cache.queryColumnMap(keyColumn, func, funcColumn, node); return cache.queryColumnMap(keyColumn, func, funcColumn, node);
} }
} }
return (Map) queryColumnMap(info, keyColumn, func, funcColumn, node).join(); return (Map) queryColumnMapCompose(info, keyColumn, func, funcColumn, node).join();
} }
@Override @Override
@@ -1296,11 +1296,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return CompletableFuture.completedFuture(cache.queryColumnMap(keyColumn, func, funcColumn, node)); return CompletableFuture.completedFuture(cache.queryColumnMap(keyColumn, func, funcColumn, node));
} }
} }
if (isAsync()) return queryColumnMap(info, keyColumn, func, funcColumn, node); if (isAsync()) return queryColumnMapCompose(info, keyColumn, func, funcColumn, node);
return CompletableFuture.supplyAsync(() -> (Map) queryColumnMap(info, keyColumn, func, funcColumn, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> (Map) queryColumnMapCompose(info, keyColumn, func, funcColumn, node).join(), getExecutor());
} }
protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMap(final EntityInfo<T> info, final String keyColumn, final FilterFunc func, final String funcColumn, FilterNode node) { protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapCompose(final EntityInfo<T> info, final String keyColumn, final FilterFunc func, final String funcColumn, FilterNode node) {
final String sqlkey = info.getSQLColumn(null, keyColumn); final String sqlkey = info.getSQLColumn(null, keyColumn);
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis(); final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final Set<String> haset = new HashSet<>(); final Set<String> haset = new HashSet<>();
@@ -1312,7 +1312,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return queryColumnMapDB(info, sql, keyColumn); return queryColumnMapDB(info, sql, keyColumn);
} }
//----------------------------- find ----------------------------- //----------------------------- findCompose -----------------------------
/** /**
* 根据主键获取对象 * 根据主键获取对象
* *
@@ -1340,7 +1340,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
T rs = cache.find(selects, pk); T rs = cache.find(selects, pk);
if (cache.isFullLoaded() || rs != null) return rs; if (cache.isFullLoaded() || rs != null) return rs;
} }
return find(info, selects, pk).join(); return findCompose(info, selects, pk).join();
} }
@Override @Override
@@ -1351,11 +1351,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
T rs = cache.find(selects, pk); T rs = cache.find(selects, pk);
if (cache.isFullLoaded() || rs != null) return CompletableFuture.completedFuture(rs); if (cache.isFullLoaded() || rs != null) return CompletableFuture.completedFuture(rs);
} }
if (isAsync()) return find(info, selects, pk); if (isAsync()) return findCompose(info, selects, pk);
return CompletableFuture.supplyAsync(() -> find(info, selects, pk).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> findCompose(info, selects, pk).join(), getExecutor());
} }
protected <T> CompletableFuture<T> find(final EntityInfo<T> info, final SelectColumn selects, Serializable pk) { protected <T> CompletableFuture<T> findCompose(final EntityInfo<T> info, final SelectColumn selects, Serializable pk) {
final String sql = "SELECT " + info.getQueryColumns(null, selects) + " FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk); final String sql = "SELECT " + info.getQueryColumns(null, selects) + " FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " find sql=" + sql); if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " find sql=" + sql);
return findDB(info, sql, true, selects); return findDB(info, sql, true, selects);
@@ -1406,7 +1406,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache(); final EntityCache<T> cache = info.getCache();
if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) return cache.find(selects, node); if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) return cache.find(selects, node);
return find(info, selects, node).join(); return DataSqlSource.this.findCompose(info, selects, node).join();
} }
@Override @Override
@@ -1416,11 +1416,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) { if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) {
return CompletableFuture.completedFuture(cache.find(selects, node)); return CompletableFuture.completedFuture(cache.find(selects, node));
} }
if (isAsync()) return find(info, selects, node); if (isAsync()) return DataSqlSource.this.findCompose(info, selects, node);
return CompletableFuture.supplyAsync(() -> find(info, selects, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> DataSqlSource.this.findCompose(info, selects, node).join(), getExecutor());
} }
protected <T> CompletableFuture<T> find(final EntityInfo<T> info, final SelectColumn selects, final FilterNode node) { protected <T> CompletableFuture<T> findCompose(final EntityInfo<T> info, final SelectColumn selects, final FilterNode node) {
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis(); final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info); final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis); final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
@@ -1477,7 +1477,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, pk); Serializable val = cache.findColumn(column, defValue, pk);
if (cache.isFullLoaded() || val != null) return val; if (cache.isFullLoaded() || val != null) return val;
} }
return findColumn(info, column, defValue, pk).join(); return findColumnCompose(info, column, defValue, pk).join();
} }
@Override @Override
@@ -1488,11 +1488,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, pk); Serializable val = cache.findColumn(column, defValue, pk);
if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val); if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val);
} }
if (isAsync()) return findColumn(info, column, defValue, pk); if (isAsync()) return findColumnCompose(info, column, defValue, pk);
return CompletableFuture.supplyAsync(() -> findColumn(info, column, defValue, pk).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> findColumnCompose(info, column, defValue, pk).join(), getExecutor());
} }
protected <T> CompletableFuture<Serializable> findColumn(final EntityInfo<T> info, String column, final Serializable defValue, final Serializable pk) { protected <T> CompletableFuture<Serializable> findColumnCompose(final EntityInfo<T> info, String column, final Serializable defValue, final Serializable pk) {
final String sql = "SELECT " + info.getSQLColumn(null, column) + " FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk); final String sql = "SELECT " + info.getSQLColumn(null, column) + " FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " find sql=" + sql); if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " find sql=" + sql);
return findColumnDB(info, sql, true, column, defValue); return findColumnDB(info, sql, true, column, defValue);
@@ -1506,7 +1506,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, node); Serializable val = cache.findColumn(column, defValue, node);
if (cache.isFullLoaded() || val != null) return val; if (cache.isFullLoaded() || val != null) return val;
} }
return findColumn(info, column, defValue, node).join(); return DataSqlSource.this.findColumnCompose(info, column, defValue, node).join();
} }
@Override @Override
@@ -1517,11 +1517,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, node); Serializable val = cache.findColumn(column, defValue, node);
if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val); if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val);
} }
if (isAsync()) return findColumn(info, column, defValue, node); if (isAsync()) return DataSqlSource.this.findColumnCompose(info, column, defValue, node);
return CompletableFuture.supplyAsync(() -> findColumn(info, column, defValue, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> DataSqlSource.this.findColumnCompose(info, column, defValue, node).join(), getExecutor());
} }
protected <T> CompletableFuture<Serializable> findColumn(final EntityInfo<T> info, String column, final Serializable defValue, final FilterNode node) { protected <T> CompletableFuture<Serializable> findColumnCompose(final EntityInfo<T> info, String column, final Serializable defValue, final FilterNode node) {
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis(); final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info); final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis); final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
@@ -1530,7 +1530,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return findColumnDB(info, sql, false, column, defValue); return findColumnDB(info, sql, false, column, defValue);
} }
//---------------------------- exists ---------------------------- //---------------------------- existsCompose ----------------------------
@Override @Override
public <T> boolean exists(Class<T> clazz, Serializable pk) { public <T> boolean exists(Class<T> clazz, Serializable pk) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
@@ -1539,7 +1539,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(pk); boolean rs = cache.exists(pk);
if (rs || cache.isFullLoaded()) return rs; if (rs || cache.isFullLoaded()) return rs;
} }
return exists(info, pk).join(); return existsCompose(info, pk).join();
} }
@Override @Override
@@ -1550,11 +1550,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(pk); boolean rs = cache.exists(pk);
if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs); if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs);
} }
if (isAsync()) return exists(info, pk); if (isAsync()) return existsCompose(info, pk);
return CompletableFuture.supplyAsync(() -> exists(info, pk).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> existsCompose(info, pk).join(), getExecutor());
} }
protected <T> CompletableFuture<Boolean> exists(final EntityInfo<T> info, Serializable pk) { protected <T> CompletableFuture<Boolean> existsCompose(final EntityInfo<T> info, Serializable pk) {
final String sql = "SELECT COUNT(*) FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk); final String sql = "SELECT COUNT(*) FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " exists sql=" + sql); if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " exists sql=" + sql);
return existsDB(info, sql, true); return existsDB(info, sql, true);
@@ -1578,7 +1578,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(node); boolean rs = cache.exists(node);
if (rs || cache.isFullLoaded()) return rs; if (rs || cache.isFullLoaded()) return rs;
} }
return exists(info, node).join(); return DataSqlSource.this.existsCompose(info, node).join();
} }
@Override @Override
@@ -1589,11 +1589,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(node); boolean rs = cache.exists(node);
if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs); if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs);
} }
if (isAsync()) return exists(info, node); if (isAsync()) return DataSqlSource.this.existsCompose(info, node);
return CompletableFuture.supplyAsync(() -> exists(info, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> DataSqlSource.this.existsCompose(info, node).join(), getExecutor());
} }
protected <T> CompletableFuture<Boolean> exists(final EntityInfo<T> info, FilterNode node) { protected <T> CompletableFuture<Boolean> existsCompose(final EntityInfo<T> info, FilterNode node) {
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis(); final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info); final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis); final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
@@ -2055,12 +2055,12 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
@Override @Override
public <T> List<T> queryList(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { public <T> List<T> queryList(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
return querySheet(true, false, clazz, selects, flipper, node).join().list(true); return querySheetCompose(true, false, clazz, selects, flipper, node).join().list(true);
} }
@Override @Override
public <T> CompletableFuture<List<T>> queryListAsync(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { public <T> CompletableFuture<List<T>> queryListAsync(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
return querySheet(true, false, clazz, selects, flipper, node).thenApply((rs) -> rs.list(true)); return querySheetCompose(true, false, clazz, selects, flipper, node).thenApply((rs) -> rs.list(true));
} }
//-----------------------sheet---------------------------- //-----------------------sheet----------------------------
@@ -2117,16 +2117,16 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
@Override @Override
public <T> Sheet<T> querySheet(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { public <T> Sheet<T> querySheet(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
return querySheet(true, true, clazz, selects, flipper, node).join(); return querySheetCompose(true, true, clazz, selects, flipper, node).join();
} }
@Override @Override
public <T> CompletableFuture<Sheet<T>> querySheetAsync(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { public <T> CompletableFuture<Sheet<T>> querySheetAsync(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
if (isAsync()) return querySheet(true, true, clazz, selects, flipper, node); if (isAsync()) return querySheetCompose(true, true, clazz, selects, flipper, node);
return CompletableFuture.supplyAsync(() -> querySheet(true, true, clazz, selects, flipper, node).join(), getExecutor()); return CompletableFuture.supplyAsync(() -> querySheetCompose(true, true, clazz, selects, flipper, node).join(), getExecutor());
} }
protected <T> CompletableFuture<Sheet<T>> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { protected <T> CompletableFuture<Sheet<T>> querySheetCompose(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz); final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache(); final EntityCache<T> cache = info.getCache();
if (readcache && cache != null && cache.isFullLoaded()) { if (readcache && cache != null && cache.isFullLoaded()) {