实现DataBatch
This commit is contained in:
@@ -432,124 +432,6 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
return CompletableFuture.supplyAsync(supplier, getExecutor());
|
||||
}
|
||||
|
||||
protected DefaultBatchInfo parseBatchInfo(DefaultDataBatch batch, Function<Class, EntityInfo> func) {
|
||||
final List<BatchAction> actions = ((DefaultDataBatch) batch).actions;
|
||||
final Map<Class, EntityInfo> infos = new HashMap<>();
|
||||
final Map<String, EntityInfo> notInsertDisTables = new HashMap<>();
|
||||
final Map<String, EntityInfo> notUpDelDisTables = new HashMap<>();
|
||||
for (BatchAction action : actions) {
|
||||
if (action instanceof InsertBatchAction1) {
|
||||
InsertBatchAction1 act = (InsertBatchAction1) action;
|
||||
Object entity = act.entity;
|
||||
Class clazz = entity.getClass();
|
||||
if (!infos.containsKey(clazz)) {
|
||||
EntityInfo info = func.apply(clazz);
|
||||
infos.put(clazz, info);
|
||||
if (info.getTableStrategy() != null) {
|
||||
String tableKey = info.getTable(entity);
|
||||
if (!info.containsDisTable(tableKey)) {
|
||||
notInsertDisTables.put(tableKey, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (action instanceof DeleteBatchAction1) {
|
||||
DeleteBatchAction1 act = (DeleteBatchAction1) action;
|
||||
Object entity = act.entity;
|
||||
Class clazz = entity.getClass();
|
||||
if (!infos.containsKey(clazz)) {
|
||||
EntityInfo info = func.apply(clazz);
|
||||
infos.put(clazz, info);
|
||||
if (info.getTableStrategy() != null) {
|
||||
String tableKey = info.getTable(entity);
|
||||
if (!info.containsDisTable(tableKey)) {
|
||||
notUpDelDisTables.put(tableKey, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (action instanceof DeleteBatchAction2) {
|
||||
DeleteBatchAction2 act = (DeleteBatchAction2) action;
|
||||
Class clazz = act.clazz;
|
||||
if (!infos.containsKey(clazz)) {
|
||||
EntityInfo info = func.apply(clazz);
|
||||
infos.put(clazz, info);
|
||||
if (info.getTableStrategy() != null) {
|
||||
String tableKey = info.getTable(act.pk);
|
||||
if (!info.containsDisTable(tableKey)) {
|
||||
notUpDelDisTables.put(tableKey, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (action instanceof DeleteBatchAction3) {
|
||||
DeleteBatchAction3 act = (DeleteBatchAction3) action;
|
||||
Class clazz = act.clazz;
|
||||
if (!infos.containsKey(clazz)) {
|
||||
EntityInfo info = func.apply(clazz);
|
||||
infos.put(clazz, info);
|
||||
if (info.getTableStrategy() != null) {
|
||||
String tableKey = info.getTable(act.node);
|
||||
if (!info.containsDisTable(tableKey)) {
|
||||
notUpDelDisTables.put(tableKey, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (action instanceof UpdateBatchAction1) {
|
||||
UpdateBatchAction1 act = (UpdateBatchAction1) action;
|
||||
Object entity = act.entity;
|
||||
Class clazz = entity.getClass();
|
||||
if (!infos.containsKey(clazz)) {
|
||||
EntityInfo info = func.apply(clazz);
|
||||
infos.put(clazz, info);
|
||||
if (info.getTableStrategy() != null) {
|
||||
String tableKey = info.getTable(entity);
|
||||
if (!info.containsDisTable(tableKey)) {
|
||||
notUpDelDisTables.put(tableKey, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (action instanceof UpdateBatchAction2) {
|
||||
UpdateBatchAction2 act = (UpdateBatchAction2) action;
|
||||
Class clazz = act.clazz;
|
||||
if (!infos.containsKey(clazz)) {
|
||||
EntityInfo info = func.apply(clazz);
|
||||
infos.put(clazz, info);
|
||||
if (info.getTableStrategy() != null) {
|
||||
String tableKey = info.getTable(act.pk);
|
||||
if (!info.containsDisTable(tableKey)) {
|
||||
notUpDelDisTables.put(tableKey, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (action instanceof UpdateBatchAction3) {
|
||||
UpdateBatchAction3 act = (UpdateBatchAction3) action;
|
||||
Class clazz = act.clazz;
|
||||
if (!infos.containsKey(clazz)) {
|
||||
EntityInfo info = func.apply(clazz);
|
||||
infos.put(clazz, info);
|
||||
if (info.getTableStrategy() != null) {
|
||||
String tableKey = info.getTable(act.node);
|
||||
if (!info.containsDisTable(tableKey)) {
|
||||
notUpDelDisTables.put(tableKey, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (action instanceof UpdateBatchAction4) {
|
||||
UpdateBatchAction4 act = (UpdateBatchAction4) action;
|
||||
Class clazz = act.entity.getClass();
|
||||
if (!infos.containsKey(clazz)) {
|
||||
EntityInfo info = func.apply(clazz);
|
||||
infos.put(clazz, info);
|
||||
if (info.getTableStrategy() != null) {
|
||||
String tableKey = act.node == null ? info.getTable(act.entity) : info.getTable(act.node);
|
||||
if (!info.containsDisTable(tableKey)) {
|
||||
notUpDelDisTables.put(tableKey, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return new DefaultBatchInfo(infos, notInsertDisTables, notUpDelDisTables);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int batch(final DataBatch batch) {
|
||||
return batchAsync(batch).join();
|
||||
@@ -557,7 +439,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> batchAsync(final DataBatch batch) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
return CompletableFuture.failedFuture(new UnsupportedOperationException("Not supported yet."));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -1398,7 +1280,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
public <T> DataBatch insert(T... entitys) {
|
||||
for (T t : entitys) {
|
||||
Objects.requireNonNull(t);
|
||||
if (t.getClass().getAnnotation(Entity.class) == null && t.getClass().getAnnotation(javax.persistence.Entity.class) == null) {
|
||||
if (t.getClass().getAnnotation(Entity.class) == null) {
|
||||
throw new SourceException("Entity Class " + t.getClass() + " must be on Annotation @Entity");
|
||||
}
|
||||
this.actions.add(new InsertBatchAction1(t));
|
||||
@@ -1410,7 +1292,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
public <T> DataBatch delete(T... entitys) {
|
||||
for (T t : entitys) {
|
||||
Objects.requireNonNull(t);
|
||||
if (t.getClass().getAnnotation(Entity.class) == null && t.getClass().getAnnotation(javax.persistence.Entity.class) == null) {
|
||||
if (t.getClass().getAnnotation(Entity.class) == null) {
|
||||
throw new SourceException("Entity Class " + t.getClass() + " must be on Annotation @Entity");
|
||||
}
|
||||
this.actions.add(new DeleteBatchAction1(t));
|
||||
@@ -1421,7 +1303,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
@Override
|
||||
public <T> DataBatch delete(Class<T> clazz, Serializable... pks) {
|
||||
Objects.requireNonNull(clazz);
|
||||
if (clazz.getAnnotation(Entity.class) == null && clazz.getAnnotation(javax.persistence.Entity.class) == null) {
|
||||
if (clazz.getAnnotation(Entity.class) == null) {
|
||||
throw new SourceException("Entity Class " + clazz + " must be on Annotation @Entity");
|
||||
}
|
||||
if (pks.length < 1) {
|
||||
@@ -1442,7 +1324,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
@Override
|
||||
public <T> DataBatch delete(Class<T> clazz, FilterNode node, Flipper flipper) {
|
||||
Objects.requireNonNull(clazz);
|
||||
if (clazz.getAnnotation(Entity.class) == null && clazz.getAnnotation(javax.persistence.Entity.class) == null) {
|
||||
if (clazz.getAnnotation(Entity.class) == null) {
|
||||
throw new SourceException("Entity Class " + clazz + " must be on Annotation @Entity");
|
||||
}
|
||||
this.actions.add(new DeleteBatchAction3(clazz, node, flipper));
|
||||
@@ -1453,7 +1335,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
public <T> DataBatch update(T... entitys) {
|
||||
for (T t : entitys) {
|
||||
Objects.requireNonNull(t);
|
||||
if (t.getClass().getAnnotation(Entity.class) == null && t.getClass().getAnnotation(javax.persistence.Entity.class) == null) {
|
||||
if (t.getClass().getAnnotation(Entity.class) == null) {
|
||||
throw new SourceException("Entity Class " + t.getClass() + " must be on Annotation @Entity");
|
||||
}
|
||||
this.actions.add(new UpdateBatchAction1(t));
|
||||
@@ -1469,7 +1351,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
@Override
|
||||
public <T> DataBatch update(Class<T> clazz, Serializable pk, ColumnValue... values) {
|
||||
Objects.requireNonNull(clazz);
|
||||
if (clazz.getAnnotation(Entity.class) == null && clazz.getAnnotation(javax.persistence.Entity.class) == null) {
|
||||
if (clazz.getAnnotation(Entity.class) == null) {
|
||||
throw new SourceException("Entity Class " + clazz + " must be on Annotation @Entity");
|
||||
}
|
||||
Objects.requireNonNull(pk);
|
||||
@@ -1496,7 +1378,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
@Override
|
||||
public <T> DataBatch update(Class<T> clazz, FilterNode node, Flipper flipper, ColumnValue... values) {
|
||||
Objects.requireNonNull(clazz);
|
||||
if (clazz.getAnnotation(Entity.class) == null && clazz.getAnnotation(javax.persistence.Entity.class) == null) {
|
||||
if (clazz.getAnnotation(Entity.class) == null) {
|
||||
throw new SourceException("Entity Class " + clazz + " must be on Annotation @Entity");
|
||||
}
|
||||
if (values.length < 1) {
|
||||
@@ -1533,7 +1415,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
@Override
|
||||
public <T> DataBatch updateColumn(T entity, final FilterNode node, SelectColumn selects) {
|
||||
Objects.requireNonNull(entity);
|
||||
if (entity.getClass().getAnnotation(Entity.class) == null && entity.getClass().getAnnotation(javax.persistence.Entity.class) == null) {
|
||||
if (entity.getClass().getAnnotation(Entity.class) == null) {
|
||||
throw new SourceException("Entity Class " + entity.getClass() + " must be on Annotation @Entity");
|
||||
}
|
||||
Objects.requireNonNull(selects);
|
||||
@@ -1543,25 +1425,6 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
|
||||
}
|
||||
|
||||
protected static class DefaultBatchInfo {
|
||||
|
||||
//EntityInfo对象
|
||||
public Map<Class, EntityInfo> entityInfos;
|
||||
|
||||
//新增操作可能不存在的分表
|
||||
public Map<String, EntityInfo> notInsertDisTables;
|
||||
|
||||
//删除修改操作可能不存在的分表
|
||||
public Map<String, EntityInfo> notUpDelDisTables;
|
||||
|
||||
public DefaultBatchInfo(Map<Class, EntityInfo> entityInfos, Map<String, EntityInfo> notInsertDisTables, Map<String, EntityInfo> notUpDelDisTables) {
|
||||
this.entityInfos = entityInfos;
|
||||
this.notInsertDisTables = notInsertDisTables;
|
||||
this.notUpDelDisTables = notUpDelDisTables;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected abstract static class BatchAction {
|
||||
|
||||
}
|
||||
|
||||
@@ -661,12 +661,15 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement
|
||||
}
|
||||
|
||||
@Local
|
||||
@Override
|
||||
public abstract int directExecute(String sql);
|
||||
|
||||
@Local
|
||||
@Override
|
||||
public abstract int[] directExecute(String... sqls);
|
||||
|
||||
@Local
|
||||
@Override
|
||||
public abstract <V> V directQuery(String sql, Function<DataResultSet, V> handler);
|
||||
|
||||
//是否异步
|
||||
|
||||
@@ -121,11 +121,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> batchAsync(final DataBatch batch) {
|
||||
return CompletableFuture.supplyAsync(() -> batch(batch), getExecutor());
|
||||
}
|
||||
|
||||
protected <T> List<PreparedStatement> createInsertPreparedStatements(final Connection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
|
||||
Attribute<T, Serializable>[] attrs = info.insertAttributes;
|
||||
final List<PreparedStatement> prestmts = new ArrayList<>();
|
||||
@@ -204,10 +199,98 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
@Override
|
||||
public int batch(final DataBatch batch) {
|
||||
Objects.requireNonNull(batch);
|
||||
final List<BatchAction> actions = ((DefaultDataBatch) batch).actions;
|
||||
final DefaultBatchInfo batchInfo = parseBatchInfo((DefaultDataBatch) batch, this);
|
||||
final DefaultDataBatch dataBatch = (DefaultDataBatch) batch;
|
||||
if (dataBatch.actions.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
int c = 0;
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = writePool.pollConnection();
|
||||
conn.setReadOnly(false);
|
||||
conn.setAutoCommit(false);
|
||||
for (BatchAction action : dataBatch.actions) {
|
||||
if (action instanceof InsertBatchAction1) {
|
||||
InsertBatchAction1 act = (InsertBatchAction1) action;
|
||||
EntityInfo info = apply(act.entity.getClass());
|
||||
c += insertDB(true, conn, info, act.entity);
|
||||
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
} else if (action instanceof DeleteBatchAction1) {
|
||||
DeleteBatchAction1 act = (DeleteBatchAction1) action;
|
||||
EntityInfo info = apply(act.entity.getClass());
|
||||
Serializable pk = info.getPrimaryValue(act.entity);
|
||||
Map<String, List<Serializable>> pkmap = info.getTableMap(pk);
|
||||
String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]);
|
||||
String[] sqls = deleteSql(info, pkmap);
|
||||
c += deleteDB(true, conn, info, tables, null, null, pkmap, sqls);
|
||||
|
||||
} else if (action instanceof DeleteBatchAction2) {
|
||||
DeleteBatchAction2 act = (DeleteBatchAction2) action;
|
||||
EntityInfo info = apply(act.clazz);
|
||||
Map<String, List<Serializable>> pkmap = info.getTableMap(act.pk);
|
||||
String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]);
|
||||
String[] sqls = deleteSql(info, pkmap);
|
||||
c += deleteDB(true, conn, info, tables, null, null, pkmap, sqls);
|
||||
|
||||
} else if (action instanceof DeleteBatchAction3) {
|
||||
DeleteBatchAction3 act = (DeleteBatchAction3) action;
|
||||
EntityInfo info = apply(act.clazz);
|
||||
String[] tables = info.getTables(act.node);
|
||||
String[] sqls = deleteSql(info, tables, act.flipper, act.node);
|
||||
c += deleteDB(true, conn, info, tables, act.flipper, act.node, null, sqls);
|
||||
|
||||
} else if (action instanceof UpdateBatchAction1) {
|
||||
UpdateBatchAction1 act = (UpdateBatchAction1) action;
|
||||
EntityInfo info = apply(act.entity.getClass());
|
||||
c += updateEntityDB(true, conn, info, act.entity);
|
||||
|
||||
} else if (action instanceof UpdateBatchAction2) {
|
||||
UpdateBatchAction2 act = (UpdateBatchAction2) action;
|
||||
EntityInfo info = apply(act.clazz);
|
||||
UpdateSqlInfo sql = updateColumnSql(info, act.pk, act.values);
|
||||
c += updateColumnDB(true, conn, info, null, sql);
|
||||
|
||||
} else if (action instanceof UpdateBatchAction3) {
|
||||
UpdateBatchAction3 act = (UpdateBatchAction3) action;
|
||||
EntityInfo info = apply(act.clazz);
|
||||
UpdateSqlInfo sql = updateColumnSql(info, act.node, act.flipper, act.values);
|
||||
c += updateColumnDB(true, conn, info, act.flipper, sql);
|
||||
|
||||
} else if (action instanceof UpdateBatchAction4) {
|
||||
UpdateBatchAction4 act = (UpdateBatchAction4) action;
|
||||
EntityInfo info = apply(act.entity.getClass());
|
||||
UpdateSqlInfo sql = updateColumnSql(info, false, act.entity, act.node, act.selects);
|
||||
c += updateColumnDB(true, conn, info, null, sql);
|
||||
}
|
||||
}
|
||||
conn.commit();
|
||||
return c;
|
||||
} catch (SourceException se) {
|
||||
if (conn != null) {
|
||||
try {
|
||||
conn.rollback();
|
||||
} catch (SQLException sqe) {
|
||||
}
|
||||
}
|
||||
throw se;
|
||||
} catch (SQLException e) {
|
||||
if (conn != null) {
|
||||
try {
|
||||
conn.rollback();
|
||||
} catch (SQLException se) {
|
||||
}
|
||||
}
|
||||
throw new SourceException(e);
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
writePool.offerConnection(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> batchAsync(final DataBatch batch) {
|
||||
return CompletableFuture.supplyAsync(() -> batch(batch), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -218,29 +301,45 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
@Override
|
||||
protected <T> int insertDB(EntityInfo<T> info, T... entitys) {
|
||||
Connection conn = null;
|
||||
final long s = System.currentTimeMillis();
|
||||
try {
|
||||
int c = 0;
|
||||
conn = writePool.pollConnection();
|
||||
Attribute<T, Serializable>[] attrs = info.insertAttributes;
|
||||
conn.setReadOnly(false);
|
||||
conn.setAutoCommit(false);
|
||||
int c = insertDB(false, conn, info, entitys);
|
||||
conn.commit();
|
||||
return c;
|
||||
} catch (SQLException e) {
|
||||
if (conn != null) {
|
||||
try {
|
||||
conn.rollback();
|
||||
} catch (SQLException se) {
|
||||
}
|
||||
}
|
||||
throw new SourceException(e);
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
writePool.offerConnection(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private <T> int insertDB(final boolean batch, final Connection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
|
||||
final long s = System.currentTimeMillis();
|
||||
int c = 0;
|
||||
String presql = null;
|
||||
PreparedStatement prestmt = null;
|
||||
|
||||
List<PreparedStatement> prestmts = null;
|
||||
Map<String, PrepareInfo<T>> prepareInfos = null;
|
||||
|
||||
if (info.getTableStrategy() == null) {
|
||||
Attribute<T, Serializable>[] attrs = info.insertAttributes;
|
||||
if (info.getTableStrategy() == null) { //单库单表
|
||||
presql = info.getInsertQuestionPrepareSQL(entitys[0]);
|
||||
prestmt = createInsertPreparedStatement(conn, presql, info, entitys);
|
||||
} else {
|
||||
} else { //分库分表
|
||||
prepareInfos = getInsertQuestionPrepareInfo(info, entitys);
|
||||
prestmts = createInsertPreparedStatements(conn, info, prepareInfos, entitys);
|
||||
}
|
||||
try {
|
||||
if (info.getTableStrategy() == null) {
|
||||
if (info.getTableStrategy() == null) { //单库单表
|
||||
int c1 = 0;
|
||||
int[] cs = prestmt.executeBatch();
|
||||
for (int cc : cs) {
|
||||
@@ -248,7 +347,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
}
|
||||
c = c1;
|
||||
prestmt.close();
|
||||
} else {
|
||||
} else { //分库分表
|
||||
int c1 = 0;
|
||||
for (PreparedStatement stmt : prestmts) {
|
||||
int[] cs = stmt.executeBatch();
|
||||
@@ -261,13 +360,17 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
stmt.close();
|
||||
}
|
||||
}
|
||||
if (!batch) {
|
||||
conn.commit();
|
||||
}
|
||||
} catch (SQLException se) {
|
||||
if (!batch) {
|
||||
conn.rollback();
|
||||
}
|
||||
if (!isTableNotExist(info, se.getSQLState())) {
|
||||
throw se;
|
||||
}
|
||||
if (info.getTableStrategy() == null) { //单表
|
||||
if (info.getTableStrategy() == null) { //单库单表
|
||||
String[] tableSqls = createTableSqls(info);
|
||||
if (tableSqls == null) {
|
||||
throw se;
|
||||
@@ -380,7 +483,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
info.disTableLock().unlock();
|
||||
}
|
||||
}
|
||||
if (info.getTableStrategy() == null) {
|
||||
if (info.getTableStrategy() == null) { //单库单表
|
||||
prestmt.close();
|
||||
prestmt = createInsertPreparedStatement(conn, presql, info, entitys);
|
||||
int c1 = 0;
|
||||
@@ -390,7 +493,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
}
|
||||
c = c1;
|
||||
prestmt.close();
|
||||
} else {
|
||||
} else { //分库分表
|
||||
for (PreparedStatement stmt : prestmts) {
|
||||
stmt.close();
|
||||
}
|
||||
@@ -407,7 +510,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
stmt.close();
|
||||
}
|
||||
}
|
||||
conn.commit();
|
||||
}
|
||||
//------------------------------------------------------------
|
||||
if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息
|
||||
@@ -471,6 +573,23 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
slowLog(s, presqls.toArray(new String[presqls.size()]));
|
||||
}
|
||||
return c;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> deleteDBAsync(final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, final String... sqls) {
|
||||
return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = writePool.pollConnection();
|
||||
conn.setReadOnly(false);
|
||||
conn.setAutoCommit(false);
|
||||
int c = deleteDB(false, conn, info, tables, flipper, node, pkmap, sqls);
|
||||
conn.commit();
|
||||
return c;
|
||||
} catch (SQLException e) {
|
||||
if (conn != null) {
|
||||
try {
|
||||
@@ -486,19 +605,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> deleteDBAsync(final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, final String... sqls) {
|
||||
return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
|
||||
Connection conn = null;
|
||||
private <T> int deleteDB(final boolean batch, final Connection conn, final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) throws SQLException {
|
||||
final long s = System.currentTimeMillis();
|
||||
try {
|
||||
conn = writePool.pollConnection();
|
||||
conn.setReadOnly(false);
|
||||
conn.setAutoCommit(false);
|
||||
int c;
|
||||
if (sqls.length == 1) {
|
||||
final Statement stmt = conn.createStatement();
|
||||
@@ -518,19 +627,22 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
}
|
||||
c = c1;
|
||||
}
|
||||
if (!batch) {
|
||||
conn.commit();
|
||||
}
|
||||
slowLog(s, sqls);
|
||||
return c;
|
||||
} catch (SQLException e) {
|
||||
if (!batch) {
|
||||
try {
|
||||
conn.rollback();
|
||||
} catch (SQLException se) {
|
||||
}
|
||||
}
|
||||
if (isTableNotExist(info, e.getSQLState())) {
|
||||
if (info.getTableStrategy() == null) {
|
||||
String[] tableSqls = createTableSqls(info);
|
||||
if (tableSqls != null) {
|
||||
try {
|
||||
Statement st = conn.createStatement();
|
||||
if (tableSqls.length == 1) {
|
||||
st.execute(tableSqls[0]);
|
||||
@@ -542,9 +654,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
}
|
||||
st.close();
|
||||
return 0;
|
||||
} catch (SQLException e2) {
|
||||
throw new SourceException(e2);
|
||||
}
|
||||
}
|
||||
//单表结构不存在
|
||||
return 0;
|
||||
@@ -555,12 +664,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
//多分表查询中一个或多个分表不存在
|
||||
// String tableName = parseNotExistTableName(e);
|
||||
// if (tableName == null) {
|
||||
// throw new SourceException(e);
|
||||
// throw e;
|
||||
// }
|
||||
String[] oldTables = tables;
|
||||
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables);
|
||||
if (notExistTables.isEmpty()) {
|
||||
throw new SourceException(e);
|
||||
throw e;
|
||||
}
|
||||
for (String t : notExistTables) {
|
||||
if (pkmap != null) {
|
||||
@@ -594,17 +703,13 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
slowLog(s, sqls);
|
||||
return c;
|
||||
} catch (SQLException se) {
|
||||
throw new SourceException(se);
|
||||
throw se;
|
||||
}
|
||||
} else {
|
||||
throw new SourceException(e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
throw new SourceException(e);
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
writePool.offerConnection(conn);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -951,16 +1056,34 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
@Override
|
||||
protected <T> int updateEntityDB(EntityInfo<T> info, T... entitys) {
|
||||
Connection conn = null;
|
||||
final long s = System.currentTimeMillis();
|
||||
String presql = null;
|
||||
PreparedStatement prestmt = null;
|
||||
|
||||
List<PreparedStatement> prestmts = null;
|
||||
Map<String, PrepareInfo<T>> prepareInfos = null;
|
||||
try {
|
||||
conn = writePool.pollConnection();
|
||||
conn.setReadOnly(false);
|
||||
conn.setAutoCommit(false);
|
||||
int c = updateEntityDB(false, conn, info, entitys);
|
||||
conn.commit();
|
||||
return c;
|
||||
} catch (SQLException e) {
|
||||
if (conn != null) {
|
||||
try {
|
||||
conn.rollback();
|
||||
} catch (SQLException se) {
|
||||
}
|
||||
}
|
||||
throw new SourceException(e);
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
writePool.offerConnection(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private <T> int updateEntityDB(final boolean batch, final Connection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
|
||||
final long s = System.currentTimeMillis();
|
||||
String presql = null;
|
||||
PreparedStatement prestmt = null;
|
||||
List<PreparedStatement> prestmts = null;
|
||||
Map<String, PrepareInfo<T>> prepareInfos = null;
|
||||
int c = -1;
|
||||
final Attribute<T, Serializable>[] attrs = info.updateAttributes;
|
||||
try {
|
||||
@@ -991,9 +1114,13 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
stmt.close();
|
||||
}
|
||||
}
|
||||
if (!batch) {
|
||||
conn.commit();
|
||||
}
|
||||
} catch (SQLException se) {
|
||||
if (!batch) {
|
||||
conn.rollback();
|
||||
}
|
||||
if (isTableNotExist(info, se.getSQLState())) {
|
||||
if (info.getTableStrategy() == null) {
|
||||
String[] tableSqls = createTableSqls(info);
|
||||
@@ -1017,7 +1144,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
} else {
|
||||
//String tableName = parseNotExistTableName(se);
|
||||
if (prepareInfos == null) {
|
||||
throw new SourceException(se);
|
||||
throw se;
|
||||
}
|
||||
for (PreparedStatement stmt : prestmts) {
|
||||
stmt.close();
|
||||
@@ -1026,7 +1153,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]);
|
||||
List<String> notExistTables = checkNotExistTables(conn, oldTables);
|
||||
if (notExistTables.isEmpty()) {
|
||||
throw new SourceException(se);
|
||||
throw se;
|
||||
}
|
||||
for (String t : notExistTables) {
|
||||
prepareInfos.remove(t);
|
||||
@@ -1118,8 +1245,23 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
slowLog(s, presqls.toArray(new String[presqls.size()]));
|
||||
}
|
||||
return c;
|
||||
} catch (SourceException se) {
|
||||
throw se;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> updateColumnDBAsync(EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) {
|
||||
return supplyAsync(() -> updateColumnDB(info, flipper, sql));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> int updateColumnDB(EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) {
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = writePool.pollConnection();
|
||||
conn.setReadOnly(false);
|
||||
conn.setAutoCommit(false);
|
||||
int c = updateColumnDB(false, conn, info, flipper, sql);
|
||||
conn.commit();
|
||||
return c;
|
||||
} catch (SQLException e) {
|
||||
if (conn != null) {
|
||||
try {
|
||||
@@ -1135,19 +1277,8 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> updateColumnDBAsync(EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) {
|
||||
return supplyAsync(() -> updateColumnDB(info, flipper, sql));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> int updateColumnDB(EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) { //String sql, boolean prepared, Object... blobs) {
|
||||
Connection conn = null;
|
||||
private <T> int updateColumnDB(final boolean batch, final Connection conn, final EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) {
|
||||
final long s = System.currentTimeMillis();
|
||||
try {
|
||||
conn = writePool.pollConnection();
|
||||
conn.setReadOnly(false);
|
||||
conn.setAutoCommit(false);
|
||||
int c = -1;
|
||||
String firstTable = null;
|
||||
try {
|
||||
@@ -1165,7 +1296,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
}
|
||||
c = prestmt.executeUpdate();
|
||||
prestmt.close();
|
||||
if (!batch) {
|
||||
conn.commit();
|
||||
}
|
||||
slowLog(s, sql.sql);
|
||||
return c;
|
||||
} else {
|
||||
@@ -1198,7 +1331,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
prestmt.close();
|
||||
}
|
||||
c = c1;
|
||||
if (!batch) {
|
||||
conn.commit();
|
||||
}
|
||||
slowLog(s, sqls);
|
||||
}
|
||||
return c;
|
||||
@@ -1209,12 +1344,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
final Statement stmt = conn.createStatement();
|
||||
c = stmt.executeUpdate(sql.sql);
|
||||
stmt.close();
|
||||
if (!batch) {
|
||||
conn.commit();
|
||||
}
|
||||
slowLog(s, sql.sql);
|
||||
return c;
|
||||
}
|
||||
} catch (SQLException se) {
|
||||
if (!batch) {
|
||||
conn.rollback();
|
||||
}
|
||||
if (isTableNotExist(info, se.getSQLState())) {
|
||||
if (info.getTableStrategy() == null) {
|
||||
String[] tableSqls = createTableSqls(info);
|
||||
@@ -1241,12 +1380,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
} else {
|
||||
// String tableName = parseNotExistTableName(se);
|
||||
// if (tableName == null) {
|
||||
// throw new SourceException(se);
|
||||
// throw se;
|
||||
// }
|
||||
String[] oldTables = sql.tables;
|
||||
List<String> notExistTables = checkNotExistTables(conn, oldTables);
|
||||
if (notExistTables.isEmpty()) {
|
||||
throw new SourceException(se);
|
||||
throw se;
|
||||
}
|
||||
for (String t : notExistTables) {
|
||||
sql.tables = Utility.remove(sql.tables, t);
|
||||
@@ -1285,7 +1424,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
prestmt.close();
|
||||
}
|
||||
c = c1;
|
||||
if (!batch) {
|
||||
conn.commit();
|
||||
}
|
||||
slowLog(s, sqls);
|
||||
return c;
|
||||
}
|
||||
@@ -1293,21 +1434,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
throw se;
|
||||
}
|
||||
}
|
||||
} catch (SourceException sex) {
|
||||
throw sex;
|
||||
} catch (SQLException e) {
|
||||
if (conn != null) {
|
||||
try {
|
||||
conn.rollback();
|
||||
} catch (SQLException se) {
|
||||
}
|
||||
}
|
||||
throw new SourceException(e);
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
writePool.offerConnection(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -2421,7 +2547,8 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
||||
|
||||
public ConnectionPool(Properties prop) {
|
||||
this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "6"));
|
||||
this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + Utility.cpus() * 4)));
|
||||
int defMaxConns = Utility.cpus() * 4;
|
||||
this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns)));
|
||||
this.queue = new ArrayBlockingQueue<>(maxConns);
|
||||
this.url = prop.getProperty(DATA_SOURCE_URL);
|
||||
String username = prop.getProperty(DATA_SOURCE_USER, "");
|
||||
|
||||
@@ -1265,6 +1265,17 @@ public final class EntityInfo<T> {
|
||||
return this.primary;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取主键字段的值
|
||||
*
|
||||
* @param entity 实体对象
|
||||
*
|
||||
* @return 主键值
|
||||
*/
|
||||
public Serializable getPrimaryValue(T entity) {
|
||||
return this.primary.get(entity);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取主键字段的Attribute单一元素数组
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user