This commit is contained in:
@@ -5,9 +5,11 @@
|
||||
*/
|
||||
package org.redkale.source;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.net.URL;
|
||||
import java.sql.Connection;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.redkale.service.Local;
|
||||
import org.redkale.util.*;
|
||||
|
||||
@@ -23,7 +25,7 @@ import org.redkale.util.*;
|
||||
@AutoLoad(false)
|
||||
@SuppressWarnings("unchecked")
|
||||
@ResourceType(DataSource.class)
|
||||
public abstract class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
||||
public class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
||||
|
||||
public DataSqlJdbcSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
|
||||
super(unitName, persistxml, readprop, writeprop);
|
||||
@@ -35,7 +37,67 @@ public abstract class DataSqlJdbcSource extends DataSqlSource<Connection> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final boolean isAysnc() {
|
||||
protected final boolean isAsync() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PoolSource<Connection> createPoolSource(DataSource source, String rwtype, Properties prop) {
|
||||
return new PoolJdbcSource(this.name, this.persistxml, rwtype, prop, this.logger);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Void> insertDB(EntityInfo<T> info, T... values) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, T... values) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, Flipper flipper, String sql, boolean prepared, Object... params) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> deleteDB(EntityInfo<T> info, Flipper flipper, String sql) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<T> findDB(EntityInfo<T> info, String sql, boolean onlypk, SelectColumn selects) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Serializable> findColumnDB(EntityInfo<T> info, String sql, boolean onlypk, String column, Serializable defValue) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Number> getNumberResultDB(EntityInfo<T> info, String sql, Number defVal, String column) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(EntityInfo<T> info, String sql, FilterFuncColumn... columns) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(EntityInfo<T> info, String sql, String keyColumn) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Boolean> existsDB(EntityInfo<T> info, String sql, boolean onlypk) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, boolean needtotal, SelectColumn selects, Flipper flipper, FilterNode node) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,9 +112,15 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
//创建连接池
|
||||
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, Properties prop);
|
||||
|
||||
//插入新纪录
|
||||
//插入纪录
|
||||
protected abstract <T> CompletableFuture<Void> insertDB(final EntityInfo<T> info, T... values);
|
||||
|
||||
//更新纪录
|
||||
protected abstract <T> CompletableFuture<Integer> updateDB(final EntityInfo<T> info, T... values);
|
||||
|
||||
//更新纪录
|
||||
protected abstract <T> CompletableFuture<Integer> updateDB(final EntityInfo<T> info, Flipper flipper, final String sql, final boolean prepared, Object... params);
|
||||
|
||||
//删除记录
|
||||
protected abstract <T> CompletableFuture<Integer> deleteDB(final EntityInfo<T> info, Flipper flipper, final String sql);
|
||||
|
||||
@@ -502,6 +508,562 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
}
|
||||
|
||||
//---------------------------- update ----------------------------
|
||||
/**
|
||||
* 更新对象, 必须是Entity对象
|
||||
*
|
||||
* @param <T> Entity类泛型
|
||||
* @param values Entity对象
|
||||
*
|
||||
* @return 更新的数据条数
|
||||
*/
|
||||
@Override
|
||||
public <T> int update(T... values) {
|
||||
if (values.length == 0) return -1;
|
||||
checkEntity("update", false, values);
|
||||
final Class<T> clazz = (Class<T>) values[0].getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, values);
|
||||
return updateDB(info, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, values);
|
||||
}
|
||||
}).join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateAsync(final T... values) {
|
||||
if (values.length == 0) return CompletableFuture.completedFuture(-1);
|
||||
CompletableFuture future = checkEntity("update", true, values);
|
||||
if (future != null) return future;
|
||||
final Class<T> clazz = (Class<T>) values[0].getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return CompletableFuture.supplyAsync(() -> updateCache(info, -1, values), getExecutor());
|
||||
if (isAsync()) return updateDB(info, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, values);
|
||||
}
|
||||
});
|
||||
return CompletableFuture.supplyAsync(() -> updateDB(info, values).join(), getExecutor()).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, values);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据主键值更新对象的column对应的值, 必须是Entity Class
|
||||
*
|
||||
* @param <T> Entity类的泛型
|
||||
* @param clazz Entity类
|
||||
* @param id 主键值
|
||||
* @param column 过滤字段名
|
||||
* @param value 过滤字段值
|
||||
*
|
||||
* @return 更新的数据条数
|
||||
*/
|
||||
@Override
|
||||
public <T> int updateColumn(Class<T> clazz, Serializable id, String column, Serializable value) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, id, column, value);
|
||||
return updateColumn(info, id, column, value).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, id, column, value);
|
||||
}
|
||||
}).join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final Serializable id, final String column, final Serializable value) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, column, value), getExecutor());
|
||||
}
|
||||
if (isAsync()) return updateColumn(info, id, column, value).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, id, column, value);
|
||||
}
|
||||
});
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(info, id, column, value).join(), getExecutor()).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, id, column, value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <T> CompletableFuture<Integer> updateColumn(final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
|
||||
if (value instanceof byte[]) {
|
||||
String sql = "UPDATE " + info.getTable(id) + " SET " + info.getSQLColumn(null, column) + " = " + getPrepareParamSign(1) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
|
||||
return updateDB(info, null, sql, true, value);
|
||||
} else {
|
||||
String sql = "UPDATE " + info.getTable(id) + " SET " + info.getSQLColumn(null, column) + " = "
|
||||
+ info.formatToString(value) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
|
||||
return updateDB(info, null, sql, false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据主键值更新对象的column对应的值, 必须是Entity Class
|
||||
*
|
||||
* @param <T> Entity类的泛型
|
||||
* @param clazz Entity类
|
||||
* @param column 过滤字段名
|
||||
* @param value 过滤字段值
|
||||
* @param node 过滤node 不能为null
|
||||
*
|
||||
* @return 更新的数据条数
|
||||
*/
|
||||
@Override
|
||||
public <T> int updateColumn(Class<T> clazz, String column, Serializable value, FilterNode node) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, column, value, node);
|
||||
return updateColumn(info, column, value, node).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, column, value, node);
|
||||
}
|
||||
}).join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final String column, final Serializable value, final FilterNode node) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, column, value, node), getExecutor());
|
||||
}
|
||||
if (isAsync()) return updateColumn(info, column, value, node).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, column, value, node);
|
||||
}
|
||||
});
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(info, column, value, node).join(), getExecutor()).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, column, value, node);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <T> CompletableFuture<Integer> updateColumn(final EntityInfo<T> info, final String column, final Serializable value, final FilterNode node) {
|
||||
Map<Class, String> joinTabalis = node.getJoinTabalis();
|
||||
CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
|
||||
CharSequence where = node.createSQLExpress(info, joinTabalis);
|
||||
|
||||
StringBuilder join1 = null;
|
||||
StringBuilder join2 = null;
|
||||
if (join != null) {
|
||||
String joinstr = join.toString();
|
||||
join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
|
||||
join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
|
||||
}
|
||||
if (value instanceof byte[]) {
|
||||
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1))
|
||||
+ " SET " + info.getSQLColumn("a", column) + " = " + getPrepareParamSign(1)
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
return updateDB(info, null, sql, true, value);
|
||||
} else {
|
||||
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1))
|
||||
+ " SET " + info.getSQLColumn("a", column) + " = " + info.formatToString(value)
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
return updateDB(info, null, sql, false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据主键值更新对象的多个column对应的值, 必须是Entity Class
|
||||
*
|
||||
* @param <T> Entity类的泛型
|
||||
* @param clazz Entity类
|
||||
* @param id 主键值
|
||||
* @param values 字段值
|
||||
*
|
||||
* @return 更新的数据条数
|
||||
*/
|
||||
@Override
|
||||
public <T> int updateColumn(final Class<T> clazz, final Serializable id, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return -1;
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, id, values);
|
||||
return updateColumn(info, id, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, id, values);
|
||||
}
|
||||
}).join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final Serializable id, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return CompletableFuture.completedFuture(-1);
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, values), getExecutor());
|
||||
}
|
||||
if (isAsync()) return updateColumn(info, id, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, id, values);
|
||||
}
|
||||
});
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(info, id, values).join(), getExecutor()).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, id, values);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <T> CompletableFuture<Integer> updateColumn(final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
|
||||
StringBuilder setsql = new StringBuilder();
|
||||
List<byte[]> blobs = null;
|
||||
int index = 0;
|
||||
for (ColumnValue col : values) {
|
||||
Attribute<T, Serializable> attr = info.getUpdateAttribute(col.getColumn());
|
||||
if (attr == null) throw new RuntimeException(info.getType() + " cannot found column " + col.getColumn());
|
||||
if (setsql.length() > 0) setsql.append(", ");
|
||||
String c = info.getSQLColumn(null, col.getColumn());
|
||||
if (col.getValue() instanceof byte[]) {
|
||||
if (blobs == null) blobs = new ArrayList<>();
|
||||
blobs.add((byte[]) col.getValue());
|
||||
setsql.append(c).append(" = ").append(getPrepareParamSign(++index));
|
||||
} else {
|
||||
setsql.append(c).append(" = ").append(info.formatSQLValue(c, col));
|
||||
}
|
||||
}
|
||||
String sql = "UPDATE " + info.getTable(id) + " SET " + setsql + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
|
||||
if (blobs == null) return updateDB(info, null, sql, false);
|
||||
return updateDB(info, null, sql, true, blobs.toArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据主键值更新对象的多个column对应的值, 必须是Entity Class
|
||||
*
|
||||
* @param <T> Entity类的泛型
|
||||
* @param clazz Entity类
|
||||
* @param node 过滤条件
|
||||
* @param values 字段值
|
||||
*
|
||||
* @return 更新的数据条数
|
||||
*/
|
||||
@Override
|
||||
public <T> int updateColumn(final Class<T> clazz, final FilterNode node, final ColumnValue... values) {
|
||||
return updateColumn(clazz, node, null, values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final FilterNode node, final ColumnValue... values) {
|
||||
return updateColumnAsync(clazz, node, null, values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> int updateColumn(final Class<T> clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return -1;
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, node, flipper, values);
|
||||
return updateColumn(info, node, flipper, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, node, flipper, values);
|
||||
}
|
||||
}).join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return CompletableFuture.completedFuture(-1);
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, node, flipper, values), getExecutor());
|
||||
}
|
||||
if (isAsync()) return updateColumn(info, node, flipper, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, node, flipper, values);
|
||||
}
|
||||
});
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, node, flipper, values);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <T> CompletableFuture<Integer> updateColumn(final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||
StringBuilder setsql = new StringBuilder();
|
||||
List<byte[]> blobs = null;
|
||||
int index = 0;
|
||||
for (ColumnValue col : values) {
|
||||
Attribute<T, Serializable> attr = info.getUpdateAttribute(col.getColumn());
|
||||
if (attr == null) continue;
|
||||
if (setsql.length() > 0) setsql.append(", ");
|
||||
String c = info.getSQLColumn("a", col.getColumn());
|
||||
if (col.getValue() instanceof byte[]) {
|
||||
if (blobs == null) blobs = new ArrayList<>();
|
||||
blobs.add((byte[]) col.getValue());
|
||||
setsql.append(c).append(" = ").append(getPrepareParamSign(++index));
|
||||
} else {
|
||||
setsql.append(c).append(" = ").append(info.formatSQLValue(c, col));
|
||||
}
|
||||
}
|
||||
Map<Class, String> joinTabalis = node.getJoinTabalis();
|
||||
CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
|
||||
CharSequence where = node.createSQLExpress(info, joinTabalis);
|
||||
StringBuilder join1 = null;
|
||||
StringBuilder join2 = null;
|
||||
if (join != null) {
|
||||
String joinstr = join.toString();
|
||||
join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
|
||||
join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
|
||||
}
|
||||
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))))
|
||||
+ info.createSQLOrderby(flipper);
|
||||
if (blobs == null) return updateDB(info, null, sql, false);
|
||||
return updateDB(info, flipper, sql, true, blobs.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> int updateColumn(final T bean, final String... columns) {
|
||||
return updateColumn(bean, SelectColumn.createIncludes(columns));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final String... columns) {
|
||||
return updateColumnAsync(bean, SelectColumn.createIncludes(columns));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> int updateColumn(final T bean, final FilterNode node, final String... columns) {
|
||||
return updateColumn(bean, node, SelectColumn.createIncludes(columns));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final FilterNode node, final String... columns) {
|
||||
return updateColumnAsync(bean, node, SelectColumn.createIncludes(columns));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> int updateColumn(final T bean, final SelectColumn selects) {
|
||||
if (bean == null || selects == null) return -1;
|
||||
Class<T> clazz = (Class) bean.getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, false, bean, null, selects);
|
||||
return updateColumns(info, false, bean, null, selects).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, false, bean, null, selects);
|
||||
}
|
||||
}).join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final SelectColumn selects) {
|
||||
if (bean == null || selects == null) return CompletableFuture.completedFuture(-1);
|
||||
Class<T> clazz = (Class) bean.getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, false, bean, null, selects), getExecutor());
|
||||
}
|
||||
if (isAsync()) return updateColumns(info, false, bean, null, selects).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, false, bean, null, selects);
|
||||
}
|
||||
});
|
||||
return CompletableFuture.supplyAsync(() -> updateColumns(info, false, bean, null, selects).join(), getExecutor()).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, false, bean, null, selects);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> int updateColumn(final T bean, final FilterNode node, final SelectColumn selects) {
|
||||
if (bean == null || node == null || selects == null) return -1;
|
||||
Class<T> clazz = (Class) bean.getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, true, bean, node, selects);
|
||||
return updateColumns(info, true, bean, node, selects).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, true, bean, node, selects);
|
||||
}
|
||||
}).join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final FilterNode node, final SelectColumn selects) {
|
||||
if (bean == null || node == null || selects == null) return CompletableFuture.completedFuture(-1);
|
||||
Class<T> clazz = (Class) bean.getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, true, bean, node, selects), getExecutor());
|
||||
}
|
||||
if (isAsync()) return updateColumns(info, true, bean, node, selects).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, true, bean, node, selects);
|
||||
}
|
||||
});
|
||||
return CompletableFuture.supplyAsync(() -> updateColumns(info, true, bean, node, selects).join(), getExecutor()).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
} else {
|
||||
updateCache(info, rs, true, bean, node, selects);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <T> CompletableFuture<Integer> updateColumns(final EntityInfo<T> info, final boolean neednode, final T bean, final FilterNode node, final SelectColumn selects) {
|
||||
StringBuilder setsql = new StringBuilder();
|
||||
List<byte[]> blobs = null;
|
||||
int index = 0;
|
||||
for (Attribute<T, Serializable> attr : info.updateAttributes) {
|
||||
if (!selects.test(attr.field())) continue;
|
||||
if (setsql.length() > 0) setsql.append(", ");
|
||||
setsql.append(info.getSQLColumn("a", attr.field()));
|
||||
Serializable val = attr.get(bean);
|
||||
if (val instanceof byte[]) {
|
||||
if (blobs == null) blobs = new ArrayList<>();
|
||||
blobs.add((byte[]) val);
|
||||
setsql.append(" = ").append(getPrepareParamSign(++index));
|
||||
} else {
|
||||
setsql.append(" = ").append(info.formatToString(val));
|
||||
}
|
||||
}
|
||||
if (neednode) {
|
||||
Map<Class, String> joinTabalis = node.getJoinTabalis();
|
||||
CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
|
||||
CharSequence where = node.createSQLExpress(info, joinTabalis);
|
||||
StringBuilder join1 = null;
|
||||
StringBuilder join2 = null;
|
||||
if (join != null) {
|
||||
String joinstr = join.toString();
|
||||
join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
|
||||
join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
|
||||
}
|
||||
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
if (blobs == null) return updateDB(info, null, sql, false);
|
||||
return updateDB(info, null, sql, true, blobs.toArray());
|
||||
} else {
|
||||
final Serializable id = info.getPrimary().get(bean);
|
||||
String sql = "UPDATE " + info.getTable(id) + " SET " + setsql + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
|
||||
if (blobs == null) return updateDB(info, null, sql, false);
|
||||
return updateDB(info, null, sql, true, blobs.toArray());
|
||||
}
|
||||
}
|
||||
|
||||
protected <T> int updateCache(final EntityInfo<T> info, int count, final boolean neednode, final T bean, final FilterNode node, final SelectColumn selects) {
|
||||
final EntityCache<T> cache = info.getCache();
|
||||
if (cache == null) return count;
|
||||
final List<Attribute<T, Serializable>> attrs = new ArrayList<>();
|
||||
for (Attribute<T, Serializable> attr : info.updateAttributes) {
|
||||
if (!selects.test(attr.field())) continue;
|
||||
attrs.add(attr);
|
||||
}
|
||||
if (neednode) {
|
||||
T[] rs = cache.update(bean, attrs, node);
|
||||
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
|
||||
return count >= 0 ? count : (rs == null ? 0 : rs.length);
|
||||
} else {
|
||||
T rs = cache.update(bean, attrs);
|
||||
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
|
||||
return count >= 0 ? count : (rs == null ? 0 : 1);
|
||||
}
|
||||
}
|
||||
|
||||
protected <T> int updateCache(final EntityInfo<T> info, int count, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||
final EntityCache<T> cache = info.getCache();
|
||||
if (cache == null) return count;
|
||||
final List<Attribute<T, Serializable>> attrs = new ArrayList<>();
|
||||
final List<ColumnValue> cols = new ArrayList<>();
|
||||
for (ColumnValue col : values) {
|
||||
Attribute<T, Serializable> attr = info.getUpdateAttribute(col.getColumn());
|
||||
if (attr == null) continue;
|
||||
attrs.add(attr);
|
||||
cols.add(col);
|
||||
}
|
||||
T[] rs = cache.updateColumn(node, flipper, attrs, cols);
|
||||
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
|
||||
return count >= 0 ? count : (rs == null ? 0 : 1);
|
||||
}
|
||||
|
||||
protected <T> int updateCache(final EntityInfo<T> info, int count, final Serializable id, final ColumnValue... values) {
|
||||
final EntityCache<T> cache = info.getCache();
|
||||
if (cache == null) return count;
|
||||
final List<Attribute<T, Serializable>> attrs = new ArrayList<>();
|
||||
final List<ColumnValue> cols = new ArrayList<>();
|
||||
for (ColumnValue col : values) {
|
||||
Attribute<T, Serializable> attr = info.getUpdateAttribute(col.getColumn());
|
||||
if (attr == null) continue;
|
||||
attrs.add(attr);
|
||||
cols.add(col);
|
||||
}
|
||||
T rs = cache.updateColumn(id, attrs, cols);
|
||||
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
|
||||
return count >= 0 ? count : (rs == null ? 0 : 1);
|
||||
}
|
||||
|
||||
protected <T> int updateCache(final EntityInfo<T> info, int count, String column, final Serializable value, FilterNode node) {
|
||||
final EntityCache<T> cache = info.getCache();
|
||||
if (cache == null) return count;
|
||||
T[] rs = cache.update(info.getAttribute(column), value, node);
|
||||
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
|
||||
return count >= 0 ? count : (rs == null ? 0 : 1);
|
||||
}
|
||||
|
||||
protected <T> int updateCache(final EntityInfo<T> info, int count, final Serializable id, final String column, final Serializable value) {
|
||||
final EntityCache<T> cache = info.getCache();
|
||||
if (cache == null) return count;
|
||||
T rs = cache.update(id, info.getAttribute(column), value);
|
||||
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
|
||||
return count >= 0 ? count : (rs == null ? 0 : 1);
|
||||
}
|
||||
|
||||
protected <T> int updateCache(final EntityInfo<T> info, int count, T... values) {
|
||||
final EntityCache<T> cache = info.getCache();
|
||||
if (cache == null) return -1;
|
||||
int c2 = 0;
|
||||
for (final T value : values) {
|
||||
c2 += cache.update(value);
|
||||
}
|
||||
if (cacheListener != null) cacheListener.updateCache(info.getType(), values);
|
||||
return count >= 0 ? count : c2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> int updateCache(Class<T> clazz, T... values) {
|
||||
if (values.length == 0) return 0;
|
||||
|
||||
Reference in New Issue
Block a user