diff --git a/src/main/java/org/redkale/source/AbstractDataSource.java b/src/main/java/org/redkale/source/AbstractDataSource.java index bffe3b07e..7b35b047a 100644 --- a/src/main/java/org/redkale/source/AbstractDataSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSource.java @@ -432,124 +432,6 @@ public abstract class AbstractDataSource extends AbstractService implements Data return CompletableFuture.supplyAsync(supplier, getExecutor()); } - protected DefaultBatchInfo parseBatchInfo(DefaultDataBatch batch, Function func) { - final List actions = ((DefaultDataBatch) batch).actions; - final Map infos = new HashMap<>(); - final Map notInsertDisTables = new HashMap<>(); - final Map notUpDelDisTables = new HashMap<>(); - for (BatchAction action : actions) { - if (action instanceof InsertBatchAction1) { - InsertBatchAction1 act = (InsertBatchAction1) action; - Object entity = act.entity; - Class clazz = entity.getClass(); - if (!infos.containsKey(clazz)) { - EntityInfo info = func.apply(clazz); - infos.put(clazz, info); - if (info.getTableStrategy() != null) { - String tableKey = info.getTable(entity); - if (!info.containsDisTable(tableKey)) { - notInsertDisTables.put(tableKey, info); - } - } - } - } else if (action instanceof DeleteBatchAction1) { - DeleteBatchAction1 act = (DeleteBatchAction1) action; - Object entity = act.entity; - Class clazz = entity.getClass(); - if (!infos.containsKey(clazz)) { - EntityInfo info = func.apply(clazz); - infos.put(clazz, info); - if (info.getTableStrategy() != null) { - String tableKey = info.getTable(entity); - if (!info.containsDisTable(tableKey)) { - notUpDelDisTables.put(tableKey, info); - } - } - } - } else if (action instanceof DeleteBatchAction2) { - DeleteBatchAction2 act = (DeleteBatchAction2) action; - Class clazz = act.clazz; - if (!infos.containsKey(clazz)) { - EntityInfo info = func.apply(clazz); - infos.put(clazz, info); - if (info.getTableStrategy() != null) { - String tableKey = info.getTable(act.pk); - if (!info.containsDisTable(tableKey)) { - notUpDelDisTables.put(tableKey, info); - } - } - } - } else if (action instanceof DeleteBatchAction3) { - DeleteBatchAction3 act = (DeleteBatchAction3) action; - Class clazz = act.clazz; - if (!infos.containsKey(clazz)) { - EntityInfo info = func.apply(clazz); - infos.put(clazz, info); - if (info.getTableStrategy() != null) { - String tableKey = info.getTable(act.node); - if (!info.containsDisTable(tableKey)) { - notUpDelDisTables.put(tableKey, info); - } - } - } - } else if (action instanceof UpdateBatchAction1) { - UpdateBatchAction1 act = (UpdateBatchAction1) action; - Object entity = act.entity; - Class clazz = entity.getClass(); - if (!infos.containsKey(clazz)) { - EntityInfo info = func.apply(clazz); - infos.put(clazz, info); - if (info.getTableStrategy() != null) { - String tableKey = info.getTable(entity); - if (!info.containsDisTable(tableKey)) { - notUpDelDisTables.put(tableKey, info); - } - } - } - } else if (action instanceof UpdateBatchAction2) { - UpdateBatchAction2 act = (UpdateBatchAction2) action; - Class clazz = act.clazz; - if (!infos.containsKey(clazz)) { - EntityInfo info = func.apply(clazz); - infos.put(clazz, info); - if (info.getTableStrategy() != null) { - String tableKey = info.getTable(act.pk); - if (!info.containsDisTable(tableKey)) { - notUpDelDisTables.put(tableKey, info); - } - } - } - } else if (action instanceof UpdateBatchAction3) { - UpdateBatchAction3 act = (UpdateBatchAction3) action; - Class clazz = act.clazz; - if (!infos.containsKey(clazz)) { - EntityInfo info = func.apply(clazz); - infos.put(clazz, info); - if (info.getTableStrategy() != null) { - String tableKey = info.getTable(act.node); - if (!info.containsDisTable(tableKey)) { - notUpDelDisTables.put(tableKey, info); - } - } - } - } else if (action instanceof UpdateBatchAction4) { - UpdateBatchAction4 act = (UpdateBatchAction4) action; - Class clazz = act.entity.getClass(); - if (!infos.containsKey(clazz)) { - EntityInfo info = func.apply(clazz); - infos.put(clazz, info); - if (info.getTableStrategy() != null) { - String tableKey = act.node == null ? info.getTable(act.entity) : info.getTable(act.node); - if (!info.containsDisTable(tableKey)) { - notUpDelDisTables.put(tableKey, info); - } - } - } - } - } - return new DefaultBatchInfo(infos, notInsertDisTables, notUpDelDisTables); - } - @Override public int batch(final DataBatch batch) { return batchAsync(batch).join(); @@ -557,7 +439,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data @Override public CompletableFuture batchAsync(final DataBatch batch) { - throw new UnsupportedOperationException("Not supported yet."); + return CompletableFuture.failedFuture(new UnsupportedOperationException("Not supported yet.")); } @Override @@ -1398,7 +1280,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data public DataBatch insert(T... entitys) { for (T t : entitys) { Objects.requireNonNull(t); - if (t.getClass().getAnnotation(Entity.class) == null && t.getClass().getAnnotation(javax.persistence.Entity.class) == null) { + if (t.getClass().getAnnotation(Entity.class) == null) { throw new SourceException("Entity Class " + t.getClass() + " must be on Annotation @Entity"); } this.actions.add(new InsertBatchAction1(t)); @@ -1410,7 +1292,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data public DataBatch delete(T... entitys) { for (T t : entitys) { Objects.requireNonNull(t); - if (t.getClass().getAnnotation(Entity.class) == null && t.getClass().getAnnotation(javax.persistence.Entity.class) == null) { + if (t.getClass().getAnnotation(Entity.class) == null) { throw new SourceException("Entity Class " + t.getClass() + " must be on Annotation @Entity"); } this.actions.add(new DeleteBatchAction1(t)); @@ -1421,7 +1303,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data @Override public DataBatch delete(Class clazz, Serializable... pks) { Objects.requireNonNull(clazz); - if (clazz.getAnnotation(Entity.class) == null && clazz.getAnnotation(javax.persistence.Entity.class) == null) { + if (clazz.getAnnotation(Entity.class) == null) { throw new SourceException("Entity Class " + clazz + " must be on Annotation @Entity"); } if (pks.length < 1) { @@ -1442,7 +1324,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data @Override public DataBatch delete(Class clazz, FilterNode node, Flipper flipper) { Objects.requireNonNull(clazz); - if (clazz.getAnnotation(Entity.class) == null && clazz.getAnnotation(javax.persistence.Entity.class) == null) { + if (clazz.getAnnotation(Entity.class) == null) { throw new SourceException("Entity Class " + clazz + " must be on Annotation @Entity"); } this.actions.add(new DeleteBatchAction3(clazz, node, flipper)); @@ -1453,7 +1335,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data public DataBatch update(T... entitys) { for (T t : entitys) { Objects.requireNonNull(t); - if (t.getClass().getAnnotation(Entity.class) == null && t.getClass().getAnnotation(javax.persistence.Entity.class) == null) { + if (t.getClass().getAnnotation(Entity.class) == null) { throw new SourceException("Entity Class " + t.getClass() + " must be on Annotation @Entity"); } this.actions.add(new UpdateBatchAction1(t)); @@ -1469,7 +1351,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data @Override public DataBatch update(Class clazz, Serializable pk, ColumnValue... values) { Objects.requireNonNull(clazz); - if (clazz.getAnnotation(Entity.class) == null && clazz.getAnnotation(javax.persistence.Entity.class) == null) { + if (clazz.getAnnotation(Entity.class) == null) { throw new SourceException("Entity Class " + clazz + " must be on Annotation @Entity"); } Objects.requireNonNull(pk); @@ -1496,7 +1378,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data @Override public DataBatch update(Class clazz, FilterNode node, Flipper flipper, ColumnValue... values) { Objects.requireNonNull(clazz); - if (clazz.getAnnotation(Entity.class) == null && clazz.getAnnotation(javax.persistence.Entity.class) == null) { + if (clazz.getAnnotation(Entity.class) == null) { throw new SourceException("Entity Class " + clazz + " must be on Annotation @Entity"); } if (values.length < 1) { @@ -1533,7 +1415,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data @Override public DataBatch updateColumn(T entity, final FilterNode node, SelectColumn selects) { Objects.requireNonNull(entity); - if (entity.getClass().getAnnotation(Entity.class) == null && entity.getClass().getAnnotation(javax.persistence.Entity.class) == null) { + if (entity.getClass().getAnnotation(Entity.class) == null) { throw new SourceException("Entity Class " + entity.getClass() + " must be on Annotation @Entity"); } Objects.requireNonNull(selects); @@ -1543,25 +1425,6 @@ public abstract class AbstractDataSource extends AbstractService implements Data } - protected static class DefaultBatchInfo { - - //EntityInfo对象 - public Map entityInfos; - - //新增操作可能不存在的分表 - public Map notInsertDisTables; - - //删除修改操作可能不存在的分表 - public Map notUpDelDisTables; - - public DefaultBatchInfo(Map entityInfos, Map notInsertDisTables, Map notUpDelDisTables) { - this.entityInfos = entityInfos; - this.notInsertDisTables = notInsertDisTables; - this.notUpDelDisTables = notUpDelDisTables; - } - - } - protected abstract static class BatchAction { } diff --git a/src/main/java/org/redkale/source/AbstractDataSqlSource.java b/src/main/java/org/redkale/source/AbstractDataSqlSource.java index 2db45c6df..f6b040b60 100644 --- a/src/main/java/org/redkale/source/AbstractDataSqlSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSqlSource.java @@ -661,12 +661,15 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement } @Local + @Override public abstract int directExecute(String sql); @Local + @Override public abstract int[] directExecute(String... sqls); @Local + @Override public abstract V directQuery(String sql, Function handler); //是否异步 diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 8bb439619..860eb801a 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -121,11 +121,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { return false; } - @Override - public CompletableFuture batchAsync(final DataBatch batch) { - return CompletableFuture.supplyAsync(() -> batch(batch), getExecutor()); - } - protected List createInsertPreparedStatements(final Connection conn, EntityInfo info, Map> prepareInfos, T... entitys) throws SQLException { Attribute[] attrs = info.insertAttributes; final List prestmts = new ArrayList<>(); @@ -204,10 +199,98 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override public int batch(final DataBatch batch) { Objects.requireNonNull(batch); - final List actions = ((DefaultDataBatch) batch).actions; - final DefaultBatchInfo batchInfo = parseBatchInfo((DefaultDataBatch) batch, this); + final DefaultDataBatch dataBatch = (DefaultDataBatch) batch; + if (dataBatch.actions.isEmpty()) { + return 0; + } + int c = 0; + Connection conn = null; + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(false); + for (BatchAction action : dataBatch.actions) { + if (action instanceof InsertBatchAction1) { + InsertBatchAction1 act = (InsertBatchAction1) action; + EntityInfo info = apply(act.entity.getClass()); + c += insertDB(true, conn, info, act.entity); - throw new UnsupportedOperationException("Not supported yet."); + } else if (action instanceof DeleteBatchAction1) { + DeleteBatchAction1 act = (DeleteBatchAction1) action; + EntityInfo info = apply(act.entity.getClass()); + Serializable pk = info.getPrimaryValue(act.entity); + Map> pkmap = info.getTableMap(pk); + String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]); + String[] sqls = deleteSql(info, pkmap); + c += deleteDB(true, conn, info, tables, null, null, pkmap, sqls); + + } else if (action instanceof DeleteBatchAction2) { + DeleteBatchAction2 act = (DeleteBatchAction2) action; + EntityInfo info = apply(act.clazz); + Map> pkmap = info.getTableMap(act.pk); + String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]); + String[] sqls = deleteSql(info, pkmap); + c += deleteDB(true, conn, info, tables, null, null, pkmap, sqls); + + } else if (action instanceof DeleteBatchAction3) { + DeleteBatchAction3 act = (DeleteBatchAction3) action; + EntityInfo info = apply(act.clazz); + String[] tables = info.getTables(act.node); + String[] sqls = deleteSql(info, tables, act.flipper, act.node); + c += deleteDB(true, conn, info, tables, act.flipper, act.node, null, sqls); + + } else if (action instanceof UpdateBatchAction1) { + UpdateBatchAction1 act = (UpdateBatchAction1) action; + EntityInfo info = apply(act.entity.getClass()); + c += updateEntityDB(true, conn, info, act.entity); + + } else if (action instanceof UpdateBatchAction2) { + UpdateBatchAction2 act = (UpdateBatchAction2) action; + EntityInfo info = apply(act.clazz); + UpdateSqlInfo sql = updateColumnSql(info, act.pk, act.values); + c += updateColumnDB(true, conn, info, null, sql); + + } else if (action instanceof UpdateBatchAction3) { + UpdateBatchAction3 act = (UpdateBatchAction3) action; + EntityInfo info = apply(act.clazz); + UpdateSqlInfo sql = updateColumnSql(info, act.node, act.flipper, act.values); + c += updateColumnDB(true, conn, info, act.flipper, sql); + + } else if (action instanceof UpdateBatchAction4) { + UpdateBatchAction4 act = (UpdateBatchAction4) action; + EntityInfo info = apply(act.entity.getClass()); + UpdateSqlInfo sql = updateColumnSql(info, false, act.entity, act.node, act.selects); + c += updateColumnDB(true, conn, info, null, sql); + } + } + conn.commit(); + return c; + } catch (SourceException se) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException sqe) { + } + } + throw se; + } catch (SQLException e) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException se) { + } + } + throw new SourceException(e); + } finally { + if (conn != null) { + writePool.offerConnection(conn); + } + } + } + + @Override + public CompletableFuture batchAsync(final DataBatch batch) { + return CompletableFuture.supplyAsync(() -> batch(batch), getExecutor()); } @Override @@ -218,202 +301,245 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override protected int insertDB(EntityInfo info, T... entitys) { Connection conn = null; - final long s = System.currentTimeMillis(); try { - int c = 0; conn = writePool.pollConnection(); - Attribute[] attrs = info.insertAttributes; conn.setReadOnly(false); conn.setAutoCommit(false); - - String presql = null; - PreparedStatement prestmt = null; - - List prestmts = null; - Map> prepareInfos = null; - - if (info.getTableStrategy() == null) { - presql = info.getInsertQuestionPrepareSQL(entitys[0]); - prestmt = createInsertPreparedStatement(conn, presql, info, entitys); - } else { - prepareInfos = getInsertQuestionPrepareInfo(info, entitys); - prestmts = createInsertPreparedStatements(conn, info, prepareInfos, entitys); + int c = insertDB(false, conn, info, entitys); + conn.commit(); + return c; + } catch (SQLException e) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException se) { + } } - try { - if (info.getTableStrategy() == null) { - int c1 = 0; - int[] cs = prestmt.executeBatch(); + throw new SourceException(e); + } finally { + if (conn != null) { + writePool.offerConnection(conn); + } + } + } + + private int insertDB(final boolean batch, final Connection conn, final EntityInfo info, T... entitys) throws SQLException { + final long s = System.currentTimeMillis(); + int c = 0; + String presql = null; + PreparedStatement prestmt = null; + List prestmts = null; + Map> prepareInfos = null; + Attribute[] attrs = info.insertAttributes; + if (info.getTableStrategy() == null) { //单库单表 + presql = info.getInsertQuestionPrepareSQL(entitys[0]); + prestmt = createInsertPreparedStatement(conn, presql, info, entitys); + } else { //分库分表 + prepareInfos = getInsertQuestionPrepareInfo(info, entitys); + prestmts = createInsertPreparedStatements(conn, info, prepareInfos, entitys); + } + try { + if (info.getTableStrategy() == null) { //单库单表 + int c1 = 0; + int[] cs = prestmt.executeBatch(); + for (int cc : cs) { + c1 += cc; + } + c = c1; + prestmt.close(); + } else { //分库分表 + int c1 = 0; + for (PreparedStatement stmt : prestmts) { + int[] cs = stmt.executeBatch(); for (int cc : cs) { c1 += cc; } - c = c1; - prestmt.close(); - } else { - int c1 = 0; - for (PreparedStatement stmt : prestmts) { - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } - } - c = c1; - for (PreparedStatement stmt : prestmts) { - stmt.close(); - } } + c = c1; + for (PreparedStatement stmt : prestmts) { + stmt.close(); + } + } + if (!batch) { conn.commit(); - } catch (SQLException se) { + } + } catch (SQLException se) { + if (!batch) { conn.rollback(); - if (!isTableNotExist(info, se.getSQLState())) { + } + if (!isTableNotExist(info, se.getSQLState())) { + throw se; + } + if (info.getTableStrategy() == null) { //单库单表 + String[] tableSqls = createTableSqls(info); + if (tableSqls == null) { throw se; } - if (info.getTableStrategy() == null) { //单表 - String[] tableSqls = createTableSqls(info); - if (tableSqls == null) { - throw se; + //创建单表结构 + Statement st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } - //创建单表结构 - Statement st = conn.createStatement(); - if (tableSqls.length == 1) { - st.execute(tableSqls[0]); - } else { - for (String tableSql : tableSqls) { - st.addBatch(tableSql); + st.executeBatch(); + } + st.close(); + } else { //分库分表 + info.disTableLock().lock(); + try { + final Set newCatalogs = new LinkedHashSet<>(); + final List tableCopys = new ArrayList<>(); + prepareInfos.forEach((t, p) -> { + int pos = t.indexOf('.'); + if (pos > 0) { + newCatalogs.add(t.substring(0, pos)); + } + tableCopys.add(getTableCopySQL(info, t)); + }); + try { + //执行一遍创建分表操作 + Statement st = conn.createStatement(); + for (String copySql : tableCopys) { + st.addBatch(copySql); } st.executeBatch(); - } - st.close(); - } else { //分库分表 - info.disTableLock().lock(); - try { - final Set newCatalogs = new LinkedHashSet<>(); - final List tableCopys = new ArrayList<>(); - prepareInfos.forEach((t, p) -> { - int pos = t.indexOf('.'); - if (pos > 0) { - newCatalogs.add(t.substring(0, pos)); - } - tableCopys.add(getTableCopySQL(info, t)); - }); - try { - //执行一遍创建分表操作 - Statement st = conn.createStatement(); - for (String copySql : tableCopys) { - st.addBatch(copySql); - } - st.executeBatch(); - st.close(); - } catch (SQLException sqle) { //多进程并发时可能会出现重复建表 - if (isTableNotExist(info, sqle.getSQLState())) { - if (newCatalogs.isEmpty()) { //分表的原始表不存在 - String[] tableSqls = createTableSqls(info); - if (tableSqls != null) { - //创建原始表 - Statement st = conn.createStatement(); - if (tableSqls.length == 1) { - st.execute(tableSqls[0]); - } else { - for (String tableSql : tableSqls) { - st.addBatch(tableSql); - } - st.executeBatch(); - } - st.close(); - //再执行一遍创建分表操作 - st = conn.createStatement(); - for (String copySql : tableCopys) { - st.addBatch(copySql); + st.close(); + } catch (SQLException sqle) { //多进程并发时可能会出现重复建表 + if (isTableNotExist(info, sqle.getSQLState())) { + if (newCatalogs.isEmpty()) { //分表的原始表不存在 + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { + //创建原始表 + Statement st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); - st.close(); } - } else { //需要先建库 - Statement st; - try { - st = conn.createStatement(); - for (String newCatalog : newCatalogs) { - st.addBatch(("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog); - } - st.executeBatch(); - st.close(); - } catch (SQLException sqle1) { - logger.log(Level.SEVERE, "create database " + tableCopys + " error", sqle1); + st.close(); + //再执行一遍创建分表操作 + st = conn.createStatement(); + for (String copySql : tableCopys) { + st.addBatch(copySql); } - try { - //再执行一遍创建分表操作 - st = conn.createStatement(); - for (String copySql : tableCopys) { - st.addBatch(copySql); - } - st.executeBatch(); - st.close(); - } catch (SQLException sqle2) { - if (isTableNotExist(info, sqle2.getSQLState())) { - String[] tableSqls = createTableSqls(info); - if (tableSqls != null) { //创建原始表 - st = conn.createStatement(); - if (tableSqls.length == 1) { - st.execute(tableSqls[0]); - } else { - for (String tableSql : tableSqls) { - st.addBatch(tableSql); - } - st.executeBatch(); - } - st.close(); - //再执行一遍创建分表操作 - st = conn.createStatement(); - for (String copySql : tableCopys) { - st.addBatch(copySql); + st.executeBatch(); + st.close(); + } + } else { //需要先建库 + Statement st; + try { + st = conn.createStatement(); + for (String newCatalog : newCatalogs) { + st.addBatch(("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog); + } + st.executeBatch(); + st.close(); + } catch (SQLException sqle1) { + logger.log(Level.SEVERE, "create database " + tableCopys + " error", sqle1); + } + try { + //再执行一遍创建分表操作 + st = conn.createStatement(); + for (String copySql : tableCopys) { + st.addBatch(copySql); + } + st.executeBatch(); + st.close(); + } catch (SQLException sqle2) { + if (isTableNotExist(info, sqle2.getSQLState())) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { //创建原始表 + st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); - st.close(); } - } else { - logger.log(Level.SEVERE, "create table2 " + tableCopys + " error", sqle2); + st.close(); + //再执行一遍创建分表操作 + st = conn.createStatement(); + for (String copySql : tableCopys) { + st.addBatch(copySql); + } + st.executeBatch(); + st.close(); } + } else { + logger.log(Level.SEVERE, "create table2 " + tableCopys + " error", sqle2); } } } } - } finally { - info.disTableLock().unlock(); } + } finally { + info.disTableLock().unlock(); } - if (info.getTableStrategy() == null) { - prestmt.close(); - prestmt = createInsertPreparedStatement(conn, presql, info, entitys); - int c1 = 0; - int[] cs = prestmt.executeBatch(); + } + if (info.getTableStrategy() == null) { //单库单表 + prestmt.close(); + prestmt = createInsertPreparedStatement(conn, presql, info, entitys); + int c1 = 0; + int[] cs = prestmt.executeBatch(); + for (int cc : cs) { + c1 += cc; + } + c = c1; + prestmt.close(); + } else { //分库分表 + for (PreparedStatement stmt : prestmts) { + stmt.close(); + } + prestmts = createInsertPreparedStatements(conn, info, prepareInfos, entitys); + int c1 = 0; + for (PreparedStatement stmt : prestmts) { + int[] cs = stmt.executeBatch(); for (int cc : cs) { c1 += cc; } - c = c1; - prestmt.close(); - } else { - for (PreparedStatement stmt : prestmts) { - stmt.close(); - } - prestmts = createInsertPreparedStatements(conn, info, prepareInfos, entitys); - int c1 = 0; - for (PreparedStatement stmt : prestmts) { - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c1 += cc; + } + c = c1; + for (PreparedStatement stmt : prestmts) { + stmt.close(); + } + } + } + //------------------------------------------------------------ + if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息 + if (info.getTableStrategy() == null) { + char[] sqlchars = presql.toCharArray(); + for (final T value : entitys) { + //----------------------------- + StringBuilder sb = new StringBuilder(128); + int i = 0; + for (char ch : sqlchars) { + if (ch == '?') { + Object obj = info.getSQLValue(attrs[i++], value); + if (obj != null && obj.getClass().isArray()) { + sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'"); + } else { + sb.append(info.formatSQLValue(obj, sqlFormatter)); + } + } else { + sb.append(ch); } } - c = c1; - for (PreparedStatement stmt : prestmts) { - stmt.close(); + String debugsql = sb.toString(); + if (info.isLoggable(logger, Level.FINEST, debugsql)) { + logger.finest(info.getType().getSimpleName() + " insert sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); } } - conn.commit(); - } - //------------------------------------------------------------ - if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息 - if (info.getTableStrategy() == null) { - char[] sqlchars = presql.toCharArray(); - for (final T value : entitys) { + } else { + prepareInfos.forEach((t, p) -> { + char[] sqlchars = p.prepareSql.toCharArray(); + for (final T value : p.entitys) { //----------------------------- StringBuilder sb = new StringBuilder(128); int i = 0; @@ -434,42 +560,35 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " insert sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); } } - } else { - prepareInfos.forEach((t, p) -> { - char[] sqlchars = p.prepareSql.toCharArray(); - for (final T value : p.entitys) { - //----------------------------- - StringBuilder sb = new StringBuilder(128); - int i = 0; - for (char ch : sqlchars) { - if (ch == '?') { - Object obj = info.getSQLValue(attrs[i++], value); - if (obj != null && obj.getClass().isArray()) { - sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'"); - } else { - sb.append(info.formatSQLValue(obj, sqlFormatter)); - } - } else { - sb.append(ch); - } - } - String debugsql = sb.toString(); - if (info.isLoggable(logger, Level.FINEST, debugsql)) { - logger.finest(info.getType().getSimpleName() + " insert sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); - } - } - }); - } - } //打印结束 - if (info.getTableStrategy() == null) { - slowLog(s, presql); - } else { - List presqls = new ArrayList<>(); - prepareInfos.forEach((t, p) -> { - presqls.add(p.prepareSql); }); - slowLog(s, presqls.toArray(new String[presqls.size()])); } + } //打印结束 + if (info.getTableStrategy() == null) { + slowLog(s, presql); + } else { + List presqls = new ArrayList<>(); + prepareInfos.forEach((t, p) -> { + presqls.add(p.prepareSql); + }); + slowLog(s, presqls.toArray(new String[presqls.size()])); + } + return c; + } + + @Override + protected CompletableFuture deleteDBAsync(final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, final String... sqls) { + return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls)); + } + + @Override + protected int deleteDB(EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) { + Connection conn = null; + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(false); + int c = deleteDB(false, conn, info, tables, flipper, node, pkmap, sqls); + conn.commit(); return c; } catch (SQLException e) { if (conn != null) { @@ -486,19 +605,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - @Override - protected CompletableFuture deleteDBAsync(final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, final String... sqls) { - return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls)); - } - - @Override - protected int deleteDB(EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) { - Connection conn = null; + private int deleteDB(final boolean batch, final Connection conn, final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) throws SQLException { final long s = System.currentTimeMillis(); try { - conn = writePool.pollConnection(); - conn.setReadOnly(false); - conn.setAutoCommit(false); int c; if (sqls.length == 1) { final Statement stmt = conn.createStatement(); @@ -518,33 +627,33 @@ public class DataJdbcSource extends AbstractDataSqlSource { } c = c1; } - conn.commit(); + if (!batch) { + conn.commit(); + } slowLog(s, sqls); return c; } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException se) { + if (!batch) { + try { + conn.rollback(); + } catch (SQLException se) { + } } if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { String[] tableSqls = createTableSqls(info); if (tableSqls != null) { - try { - Statement st = conn.createStatement(); - if (tableSqls.length == 1) { - st.execute(tableSqls[0]); - } else { - for (String tableSql : tableSqls) { - st.addBatch(tableSql); - } - st.executeBatch(); + Statement st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } - st.close(); - return 0; - } catch (SQLException e2) { - throw new SourceException(e2); + st.executeBatch(); } + st.close(); + return 0; } //单表结构不存在 return 0; @@ -555,12 +664,12 @@ public class DataJdbcSource extends AbstractDataSqlSource { //多分表查询中一个或多个分表不存在 // String tableName = parseNotExistTableName(e); // if (tableName == null) { -// throw new SourceException(e); +// throw e; // } String[] oldTables = tables; List notExistTables = checkNotExistTablesNoThrows(conn, tables); if (notExistTables.isEmpty()) { - throw new SourceException(e); + throw e; } for (String t : notExistTables) { if (pkmap != null) { @@ -594,17 +703,13 @@ public class DataJdbcSource extends AbstractDataSqlSource { slowLog(s, sqls); return c; } catch (SQLException se) { - throw new SourceException(se); + throw se; } } else { - throw new SourceException(e); + throw e; } } - throw new SourceException(e); - } finally { - if (conn != null) { - writePool.offerConnection(conn); - } + throw e; } } @@ -951,33 +1056,114 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override protected int updateEntityDB(EntityInfo info, T... entitys) { Connection conn = null; - final long s = System.currentTimeMillis(); - String presql = null; - PreparedStatement prestmt = null; - - List prestmts = null; - Map> prepareInfos = null; try { conn = writePool.pollConnection(); conn.setReadOnly(false); conn.setAutoCommit(false); - int c = -1; - final Attribute[] attrs = info.updateAttributes; - try { + int c = updateEntityDB(false, conn, info, entitys); + conn.commit(); + return c; + } catch (SQLException e) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException se) { + } + } + throw new SourceException(e); + } finally { + if (conn != null) { + writePool.offerConnection(conn); + } + } + } + + private int updateEntityDB(final boolean batch, final Connection conn, final EntityInfo info, T... entitys) throws SQLException { + final long s = System.currentTimeMillis(); + String presql = null; + PreparedStatement prestmt = null; + List prestmts = null; + Map> prepareInfos = null; + int c = -1; + final Attribute[] attrs = info.updateAttributes; + try { + if (info.getTableStrategy() == null) { + presql = info.getUpdateQuestionPrepareSQL(entitys[0]); + prestmt = createUpdatePreparedStatement(conn, presql, info, entitys); + int c1 = 0; + int[] pc = prestmt.executeBatch(); + for (int p : pc) { + if (p >= 0) { + c1 += p; + } + } + c = c1; + prestmt.close(); + } else { + prepareInfos = getUpdateQuestionPrepareInfo(info, entitys); + prestmts = createUpdatePreparedStatements(conn, info, prepareInfos, entitys); + int c1 = 0; + for (PreparedStatement stmt : prestmts) { + int[] cs = stmt.executeBatch(); + for (int cc : cs) { + c1 += cc; + } + } + c = c1; + for (PreparedStatement stmt : prestmts) { + stmt.close(); + } + } + if (!batch) { + conn.commit(); + } + } catch (SQLException se) { + if (!batch) { + conn.rollback(); + } + if (isTableNotExist(info, se.getSQLState())) { if (info.getTableStrategy() == null) { - presql = info.getUpdateQuestionPrepareSQL(entitys[0]); - prestmt = createUpdatePreparedStatement(conn, presql, info, entitys); - int c1 = 0; - int[] pc = prestmt.executeBatch(); - for (int p : pc) { - if (p >= 0) { - c1 += p; + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { + try { + Statement st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); + } + st.executeBatch(); + } + st.close(); + } catch (SQLException e2) { } } - c = c1; - prestmt.close(); + //表不存在,更新条数为0 + return 0; } else { - prepareInfos = getUpdateQuestionPrepareInfo(info, entitys); + //String tableName = parseNotExistTableName(se); + if (prepareInfos == null) { + throw se; + } + for (PreparedStatement stmt : prestmts) { + stmt.close(); + } + + String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]); + List notExistTables = checkNotExistTables(conn, oldTables); + if (notExistTables.isEmpty()) { + throw se; + } + for (String t : notExistTables) { + prepareInfos.remove(t); + } + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "update entitys, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + prepareInfos.keySet()); + } + if (prepareInfos.isEmpty()) { //分表全部不存在 + return 0; + } prestmts = createUpdatePreparedStatements(conn, info, prepareInfos, entitys); int c1 = 0; for (PreparedStatement stmt : prestmts) { @@ -990,77 +1176,42 @@ public class DataJdbcSource extends AbstractDataSqlSource { for (PreparedStatement stmt : prestmts) { stmt.close(); } + conn.commit(); } - conn.commit(); - } catch (SQLException se) { - conn.rollback(); - if (isTableNotExist(info, se.getSQLState())) { - if (info.getTableStrategy() == null) { - String[] tableSqls = createTableSqls(info); - if (tableSqls != null) { - try { - Statement st = conn.createStatement(); - if (tableSqls.length == 1) { - st.execute(tableSqls[0]); - } else { - for (String tableSql : tableSqls) { - st.addBatch(tableSql); - } - st.executeBatch(); - } - st.close(); - } catch (SQLException e2) { - } - } - //表不存在,更新条数为0 - return 0; - } else { - //String tableName = parseNotExistTableName(se); - if (prepareInfos == null) { - throw new SourceException(se); - } - for (PreparedStatement stmt : prestmts) { - stmt.close(); - } - - String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]); - List notExistTables = checkNotExistTables(conn, oldTables); - if (notExistTables.isEmpty()) { - throw new SourceException(se); - } - for (String t : notExistTables) { - prepareInfos.remove(t); - } - if (logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "update entitys, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + prepareInfos.keySet()); - } - if (prepareInfos.isEmpty()) { //分表全部不存在 - return 0; - } - prestmts = createUpdatePreparedStatements(conn, info, prepareInfos, entitys); - int c1 = 0; - for (PreparedStatement stmt : prestmts) { - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } - } - c = c1; - for (PreparedStatement stmt : prestmts) { - stmt.close(); - } - conn.commit(); - } - } else { - throw se; - } + } else { + throw se; } + } - if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息 - Attribute primary = info.getPrimary(); - if (info.getTableStrategy() == null) { - char[] sqlchars = presql.toCharArray(); - for (final T value : entitys) { + if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息 + Attribute primary = info.getPrimary(); + if (info.getTableStrategy() == null) { + char[] sqlchars = presql.toCharArray(); + for (final T value : entitys) { + //----------------------------- + StringBuilder sb = new StringBuilder(128); + int i = 0; + for (char ch : sqlchars) { + if (ch == '?') { + Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value); + if (obj != null && obj.getClass().isArray()) { + sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'"); + } else { + sb.append(info.formatSQLValue(obj, sqlFormatter)); + } + } else { + sb.append(ch); + } + } + String debugsql = sb.toString(); + if (info.isLoggable(logger, Level.FINEST, debugsql)) { + logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); + } + } + } else { + prepareInfos.forEach((t, p) -> { + char[] sqlchars = p.prepareSql.toCharArray(); + for (final T value : p.entitys) { //----------------------------- StringBuilder sb = new StringBuilder(128); int i = 0; @@ -1081,45 +1232,36 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); } } - } else { - prepareInfos.forEach((t, p) -> { - char[] sqlchars = p.prepareSql.toCharArray(); - for (final T value : p.entitys) { - //----------------------------- - StringBuilder sb = new StringBuilder(128); - int i = 0; - for (char ch : sqlchars) { - if (ch == '?') { - Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value); - if (obj != null && obj.getClass().isArray()) { - sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'"); - } else { - sb.append(info.formatSQLValue(obj, sqlFormatter)); - } - } else { - sb.append(ch); - } - } - String debugsql = sb.toString(); - if (info.isLoggable(logger, Level.FINEST, debugsql)) { - logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); - } - } - }); - } - } //打印结束 - if (info.getTableStrategy() == null) { - slowLog(s, presql); - } else { - List presqls = new ArrayList<>(); - prepareInfos.forEach((t, p) -> { - presqls.add(p.prepareSql); }); - slowLog(s, presqls.toArray(new String[presqls.size()])); } + } //打印结束 + if (info.getTableStrategy() == null) { + slowLog(s, presql); + } else { + List presqls = new ArrayList<>(); + prepareInfos.forEach((t, p) -> { + presqls.add(p.prepareSql); + }); + slowLog(s, presqls.toArray(new String[presqls.size()])); + } + return c; + } + + @Override + protected CompletableFuture updateColumnDBAsync(EntityInfo info, Flipper flipper, UpdateSqlInfo sql) { + return supplyAsync(() -> updateColumnDB(info, flipper, sql)); + } + + @Override + protected int updateColumnDB(EntityInfo info, Flipper flipper, UpdateSqlInfo sql) { + Connection conn = null; + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(false); + int c = updateColumnDB(false, conn, info, flipper, sql); + conn.commit(); return c; - } catch (SourceException se) { - throw se; } catch (SQLException e) { if (conn != null) { try { @@ -1135,177 +1277,161 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - @Override - protected CompletableFuture updateColumnDBAsync(EntityInfo info, Flipper flipper, UpdateSqlInfo sql) { - return supplyAsync(() -> updateColumnDB(info, flipper, sql)); - } - - @Override - protected int updateColumnDB(EntityInfo info, Flipper flipper, UpdateSqlInfo sql) { //String sql, boolean prepared, Object... blobs) { - Connection conn = null; + private int updateColumnDB(final boolean batch, final Connection conn, final EntityInfo info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) { final long s = System.currentTimeMillis(); + int c = -1; + String firstTable = null; try { - conn = writePool.pollConnection(); - conn.setReadOnly(false); - conn.setAutoCommit(false); - int c = -1; - String firstTable = null; - try { - if (sql.blobs != null || sql.tables != null) { - if (sql.tables == null) { - final PreparedStatement prestmt = conn.prepareStatement(sql.sql); - int index = 0; - for (byte[] param : sql.blobs) { - Blob blob = conn.createBlob(); - blob.setBytes(1, param); - prestmt.setBlob(++index, blob); - } - if (info.isLoggable(logger, Level.FINEST, sql.sql)) { - logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql); - } - c = prestmt.executeUpdate(); - prestmt.close(); - conn.commit(); - slowLog(s, sql.sql); - return c; - } else { - firstTable = sql.tables[0]; - List prestmts = new ArrayList<>(); - String[] sqls = new String[sql.tables.length]; - for (int i = 0; i < sql.tables.length; i++) { - sqls[i] = i == 0 ? sql.sql : sql.sql.replaceFirst(firstTable, sql.tables[i]); - PreparedStatement prestmt = conn.prepareStatement(sqls[i]); - int index = 0; - if (sql.blobs != null) { - for (byte[] param : sql.blobs) { - Blob blob = conn.createBlob(); - blob.setBytes(1, param); - prestmt.setBlob(++index, blob); - } - } - prestmt.addBatch(); - prestmts.add(prestmt); - } - if (info.isLoggable(logger, Level.FINEST, sql.sql)) { - logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls)); - } - int c1 = 0; - for (PreparedStatement prestmt : prestmts) { - int[] cs = prestmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } - prestmt.close(); - } - c = c1; - conn.commit(); - slowLog(s, sqls); + if (sql.blobs != null || sql.tables != null) { + if (sql.tables == null) { + final PreparedStatement prestmt = conn.prepareStatement(sql.sql); + int index = 0; + for (byte[] param : sql.blobs) { + Blob blob = conn.createBlob(); + blob.setBytes(1, param); + prestmt.setBlob(++index, blob); } - return c; - } else { if (info.isLoggable(logger, Level.FINEST, sql.sql)) { logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql); } - final Statement stmt = conn.createStatement(); - c = stmt.executeUpdate(sql.sql); - stmt.close(); - conn.commit(); + c = prestmt.executeUpdate(); + prestmt.close(); + if (!batch) { + conn.commit(); + } slowLog(s, sql.sql); return c; - } - } catch (SQLException se) { - conn.rollback(); - if (isTableNotExist(info, se.getSQLState())) { - if (info.getTableStrategy() == null) { - String[] tableSqls = createTableSqls(info); - if (tableSqls != null) { - try { - Statement st = conn.createStatement(); - if (tableSqls.length == 1) { - st.execute(tableSqls[0]); - } else { - for (String tableSql : tableSqls) { - st.addBatch(tableSql); - } - st.executeBatch(); - } - st.close(); - } catch (SQLException e2) { + } else { + firstTable = sql.tables[0]; + List prestmts = new ArrayList<>(); + String[] sqls = new String[sql.tables.length]; + for (int i = 0; i < sql.tables.length; i++) { + sqls[i] = i == 0 ? sql.sql : sql.sql.replaceFirst(firstTable, sql.tables[i]); + PreparedStatement prestmt = conn.prepareStatement(sqls[i]); + int index = 0; + if (sql.blobs != null) { + for (byte[] param : sql.blobs) { + Blob blob = conn.createBlob(); + blob.setBytes(1, param); + prestmt.setBlob(++index, blob); } } - //表不存在,更新条数为0 - return 0; - } else if (sql.tables == null) { - //单一分表不存在 - return 0; - } else { + prestmt.addBatch(); + prestmts.add(prestmt); + } + if (info.isLoggable(logger, Level.FINEST, sql.sql)) { + logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls)); + } + int c1 = 0; + for (PreparedStatement prestmt : prestmts) { + int[] cs = prestmt.executeBatch(); + for (int cc : cs) { + c1 += cc; + } + prestmt.close(); + } + c = c1; + if (!batch) { + conn.commit(); + } + slowLog(s, sqls); + } + return c; + } else { + if (info.isLoggable(logger, Level.FINEST, sql.sql)) { + logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql); + } + final Statement stmt = conn.createStatement(); + c = stmt.executeUpdate(sql.sql); + stmt.close(); + if (!batch) { + conn.commit(); + } + slowLog(s, sql.sql); + return c; + } + } catch (SQLException se) { + if (!batch) { + conn.rollback(); + } + if (isTableNotExist(info, se.getSQLState())) { + if (info.getTableStrategy() == null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { + try { + Statement st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); + } + st.executeBatch(); + } + st.close(); + } catch (SQLException e2) { + } + } + //表不存在,更新条数为0 + return 0; + } else if (sql.tables == null) { + //单一分表不存在 + return 0; + } else { // String tableName = parseNotExistTableName(se); // if (tableName == null) { -// throw new SourceException(se); +// throw se; // } - String[] oldTables = sql.tables; - List notExistTables = checkNotExistTables(conn, oldTables); - if (notExistTables.isEmpty()) { - throw new SourceException(se); - } - for (String t : notExistTables) { - sql.tables = Utility.remove(sql.tables, t); - } - if (logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "updateColumn, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(sql.tables)); - } - if (sql.tables.length == 0) { //分表全部不存在 - return 0; - } - List prestmts = new ArrayList<>(); - String[] sqls = new String[sql.tables.length]; - for (int i = 0; i < sql.tables.length; i++) { - sqls[i] = sql.sql.replaceFirst(firstTable, sql.tables[i]); - PreparedStatement prestmt = conn.prepareStatement(sqls[i]); - int index = 0; - if (sql.blobs != null) { - for (byte[] param : sql.blobs) { - Blob blob = conn.createBlob(); - blob.setBytes(1, param); - prestmt.setBlob(++index, blob); - } - } - prestmt.addBatch(); - prestmts.add(prestmt); - } - if (info.isLoggable(logger, Level.FINEST, sql.sql)) { - logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls)); - } - int c1 = 0; - for (PreparedStatement prestmt : prestmts) { - int[] cs = prestmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } - prestmt.close(); - } - c = c1; - conn.commit(); - slowLog(s, sqls); - return c; + String[] oldTables = sql.tables; + List notExistTables = checkNotExistTables(conn, oldTables); + if (notExistTables.isEmpty()) { + throw se; } - } else { - throw se; + for (String t : notExistTables) { + sql.tables = Utility.remove(sql.tables, t); + } + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "updateColumn, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(sql.tables)); + } + if (sql.tables.length == 0) { //分表全部不存在 + return 0; + } + List prestmts = new ArrayList<>(); + String[] sqls = new String[sql.tables.length]; + for (int i = 0; i < sql.tables.length; i++) { + sqls[i] = sql.sql.replaceFirst(firstTable, sql.tables[i]); + PreparedStatement prestmt = conn.prepareStatement(sqls[i]); + int index = 0; + if (sql.blobs != null) { + for (byte[] param : sql.blobs) { + Blob blob = conn.createBlob(); + blob.setBytes(1, param); + prestmt.setBlob(++index, blob); + } + } + prestmt.addBatch(); + prestmts.add(prestmt); + } + if (info.isLoggable(logger, Level.FINEST, sql.sql)) { + logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls)); + } + int c1 = 0; + for (PreparedStatement prestmt : prestmts) { + int[] cs = prestmt.executeBatch(); + for (int cc : cs) { + c1 += cc; + } + prestmt.close(); + } + c = c1; + if (!batch) { + conn.commit(); + } + slowLog(s, sqls); + return c; } - } - } catch (SourceException sex) { - throw sex; - } catch (SQLException e) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException se) { - } - } - throw new SourceException(e); - } finally { - if (conn != null) { - writePool.offerConnection(conn); + } else { + throw se; } } } @@ -2421,7 +2547,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { public ConnectionPool(Properties prop) { this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "6")); - this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + Utility.cpus() * 4))); + int defMaxConns = Utility.cpus() * 4; + this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns))); this.queue = new ArrayBlockingQueue<>(maxConns); this.url = prop.getProperty(DATA_SOURCE_URL); String username = prop.getProperty(DATA_SOURCE_USER, ""); diff --git a/src/main/java/org/redkale/source/EntityInfo.java b/src/main/java/org/redkale/source/EntityInfo.java index fb21b397f..7f4a5d0fe 100644 --- a/src/main/java/org/redkale/source/EntityInfo.java +++ b/src/main/java/org/redkale/source/EntityInfo.java @@ -1265,6 +1265,17 @@ public final class EntityInfo { return this.primary; } + /** + * 获取主键字段的值 + * + * @param entity 实体对象 + * + * @return 主键值 + */ + public Serializable getPrimaryValue(T entity) { + return this.primary.get(entity); + } + /** * 获取主键字段的Attribute单一元素数组 *