From 89b0a06e511efe81f70cb2b81bcbd022171d7fde Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 16 Oct 2023 10:10:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DDataJdbcSource=E4=BA=8B?= =?UTF-8?q?=E5=8A=A1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/source/DataJdbcSource.java | 436 ++++++++++-------- 1 file changed, 239 insertions(+), 197 deletions(-) diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 083d5b83c..b5b9f03c4 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -219,10 +219,10 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c = 0; SourceConnection conn = null; + List stmtsRef = new ArrayList<>(); try { conn = writePool.pollTransConnection(); conn.setAutoCommit(false); - BoolRef commitRef = new BoolRef(); for (BatchAction action : dataBatch.actions) { if (action instanceof RunnableBatchAction) { RunnableBatchAction act = (RunnableBatchAction) action; @@ -231,7 +231,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } else if (action instanceof InsertBatchAction1) { InsertBatchAction1 act = (InsertBatchAction1) action; EntityInfo info = apply(act.entity.getClass()); - c += insertDBStatement(true, commitRef, conn, info, act.entity); + c += insertDBStatement(true, stmtsRef, conn, info, act.entity); } else if (action instanceof DeleteBatchAction1) { DeleteBatchAction1 act = (DeleteBatchAction1) action; @@ -240,7 +240,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { Map> pkmap = info.getTableMap(pk); String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]); String[] sqls = deleteSql(info, pkmap); - c += deleteDBStatement(true, commitRef, conn, info, tables, null, null, pkmap, sqls); + c += deleteDBStatement(true, stmtsRef, conn, info, tables, null, null, pkmap, sqls); } else if (action instanceof DeleteBatchAction2) { DeleteBatchAction2 act = (DeleteBatchAction2) action; @@ -248,56 +248,48 @@ public class DataJdbcSource extends AbstractDataSqlSource { Map> pkmap = info.getTableMap(act.pk); String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]); String[] sqls = deleteSql(info, pkmap); - c += deleteDBStatement(true, commitRef, conn, info, tables, null, null, pkmap, sqls); + c += deleteDBStatement(true, stmtsRef, conn, info, tables, null, null, pkmap, sqls); } else if (action instanceof DeleteBatchAction3) { DeleteBatchAction3 act = (DeleteBatchAction3) action; EntityInfo info = apply(act.clazz); String[] tables = info.getTables(act.node); String[] sqls = deleteSql(info, tables, act.flipper, act.node); - c += deleteDBStatement(true, commitRef, conn, info, tables, act.flipper, act.node, null, sqls); + c += deleteDBStatement(true, stmtsRef, conn, info, tables, act.flipper, act.node, null, sqls); } else if (action instanceof UpdateBatchAction1) { UpdateBatchAction1 act = (UpdateBatchAction1) action; EntityInfo info = apply(act.entity.getClass()); - c += updateEntityDBStatement(true, commitRef, conn, info, act.entity); + c += updateEntityDBStatement(true, stmtsRef, conn, info, act.entity); } else if (action instanceof UpdateBatchAction2) { UpdateBatchAction2 act = (UpdateBatchAction2) action; EntityInfo info = apply(act.clazz); UpdateSqlInfo sql = updateColumnSql(info, act.pk, act.values); - c += updateColumnDBStatement(true, commitRef, conn, info, null, sql); + c += updateColumnDBStatement(true, stmtsRef, conn, info, null, sql); } else if (action instanceof UpdateBatchAction3) { UpdateBatchAction3 act = (UpdateBatchAction3) action; EntityInfo info = apply(act.clazz); UpdateSqlInfo sql = updateColumnSql(info, act.node, act.flipper, act.values); - c += updateColumnDBStatement(true, commitRef, conn, info, act.flipper, sql); + c += updateColumnDBStatement(true, stmtsRef, conn, info, act.flipper, sql); } else if (action instanceof UpdateBatchAction4) { UpdateBatchAction4 act = (UpdateBatchAction4) action; EntityInfo info = apply(act.entity.getClass()); UpdateSqlInfo sql = updateColumnSql(info, false, act.entity, act.node, act.selects); - c += updateColumnDBStatement(true, commitRef, conn, info, null, sql); + c += updateColumnDBStatement(true, stmtsRef, conn, info, null, sql); } } conn.commit(); return c; } catch (SourceException se) { if (conn != null) { - try { - conn.rollback(); - } catch (SQLException sqe) { - } + conn.rollback(stmtsRef); } throw se; } catch (SQLException e) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException se) { - } - } + conn.rollback(stmtsRef); throw new SourceException(e); } finally { if (conn != null) { @@ -319,22 +311,16 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override protected int insertDB(EntityInfo info, T... entitys) { SourceConnection conn = null; + List stmtsRef = new ArrayList<>(); try { conn = writePool.pollConnection(); conn.setAutoCommit(false); - BoolRef commitRef = new BoolRef(); - int c = insertDBStatement(false, commitRef, conn, info, entitys); - if (!commitRef.get()) { - conn.commit(); - } + int c = insertDBStatement(false, stmtsRef, conn, info, entitys); + conn.commit(); + conn.offerUpdateStatement(stmtsRef); return c; } catch (SQLException e) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException se) { - } - } + conn.rollback(stmtsRef); throw new SourceException(e); } finally { if (conn != null) { @@ -343,7 +329,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - private int insertDBStatement(final boolean batch, BoolRef commitRef, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException { + private int insertDBStatement(final boolean batch, List stmtsRef, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException { final long s = System.currentTimeMillis(); int c = 0; String presql = null; @@ -378,7 +364,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { primary.set(entity, set.getObject(1)); } } - set.close(); } } else { //分库分表 int c1 = 0; @@ -405,18 +390,21 @@ public class DataJdbcSource extends AbstractDataSqlSource { primary.set(entity, set.getObject(1)); } } - set.close(); } } } if (!batch) { conn.commit(); - commitRef.asTrue(); + conn.offerUpdateStatement(prestmt, prestmts); + } else { + if (prestmt != null) { + stmtsRef.add(prestmt); + } else { + stmtsRef.addAll(prestmts); + } } } catch (SQLException se) { - if (!batch) { - conn.rollback(); - } + conn.rollback(prestmt, prestmts); if (!isTableNotExist(info, se.getSQLState())) { throw se; } @@ -552,13 +540,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { primary.set(entity, set.getObject(1)); } } - set.close(); } - conn.offerUpdateStatement(prestmt); } else { //分库分表 - for (PreparedStatement stmt : prestmts) { - conn.offerUpdateStatement(stmt); - } + conn.offerUpdateStatement(prestmts); prestmts = prepareInsertEntityStatements(conn, info, prepareInfos, entitys); int c1 = 0; for (PreparedStatement stmt : prestmts) { @@ -584,11 +568,17 @@ public class DataJdbcSource extends AbstractDataSqlSource { primary.set(entity, set.getObject(1)); } } - set.close(); } } - for (PreparedStatement stmt : prestmts) { - conn.offerUpdateStatement(stmt); + } + if (!batch) { + conn.commit(); + conn.offerUpdateStatement(prestmt, prestmts); + } else { + if (prestmt != null) { + stmtsRef.add(prestmt); + } else { + stmtsRef.addAll(prestmts); } } } @@ -664,22 +654,16 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override protected int deleteDB(EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) { SourceConnection conn = null; + List stmtsRef = new ArrayList<>(); try { conn = writePool.pollConnection(); conn.setAutoCommit(false); - BoolRef commitRef = new BoolRef(); - int c = deleteDBStatement(false, commitRef, conn, info, tables, flipper, node, pkmap, sqls); - if (!commitRef.get()) { - conn.commit(); - } + int c = deleteDBStatement(false, stmtsRef, conn, info, tables, flipper, node, pkmap, sqls); + conn.commit(); + conn.offerUpdateStatement(stmtsRef); return c; } catch (SQLException e) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException se) { - } - } + conn.rollback(stmtsRef); throw new SourceException(e); } finally { if (conn != null) { @@ -688,49 +672,46 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - private int deleteDBStatement(final boolean batch, BoolRef commitRef, final SourceConnection conn, final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) throws SQLException { + private int deleteDBStatement(final boolean batch, List stmtsRef, final SourceConnection conn, final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) throws SQLException { final long s = System.currentTimeMillis(); + Statement stmt = null; try { int c; if (sqls.length == 1) { - final Statement stmt = conn.createUpdateStatement(); + stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(sqls[0]); conn.offerUpdateStatement(stmt); } else { - final Statement stmt = conn.createUpdateStatement(); + stmt = conn.createUpdateStatement(); for (String sql : sqls) { stmt.addBatch(sql); } c = Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); } if (!batch) { conn.commit(); - commitRef.asTrue(); + conn.offerUpdateStatement(stmt); + } else { + stmtsRef.add(stmt); } slowLog(s, sqls); return c; } catch (SQLException e) { - if (!batch) { - try { - conn.rollback(); - } catch (SQLException se) { - } - } + conn.rollback(stmt); if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { String[] tableSqls = createTableSqls(info); if (tableSqls != null) { - Statement stmt = conn.createUpdateStatement(); + Statement stmt2 = conn.createUpdateStatement(); if (tableSqls.length == 1) { - stmt.execute(tableSqls[0]); + stmt2.execute(tableSqls[0]); } else { for (String tableSql : tableSqls) { - stmt.addBatch(tableSql); + stmt2.addBatch(tableSql); } - stmt.executeBatch(); + stmt2.executeBatch(); } - conn.offerUpdateStatement(stmt); + conn.offerUpdateStatement(stmt2); return 0; } //单表结构不存在 @@ -766,18 +747,20 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (info.isLoggable(logger, Level.FINEST, sqls[0])) { logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls)); } + Statement stmt2 = null; try { - final Statement stmt = conn.createUpdateStatement(); + stmt2 = conn.createUpdateStatement(); for (String sql : sqls) { - stmt.addBatch(sql); + stmt2.addBatch(sql); } - int c = Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); + int c = Utility.sum(stmt2.executeBatch()); conn.commit(); slowLog(s, sqls); return c; } catch (SQLException se) { throw se; + } finally { + conn.offerUpdateStatement(stmt2); } } else { throw e; @@ -796,30 +779,27 @@ public class DataJdbcSource extends AbstractDataSqlSource { protected int clearTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) { SourceConnection conn = null; final long s = System.currentTimeMillis(); + Statement stmt = null; try { conn = writePool.pollConnection(); conn.setAutoCommit(false); int c; if (sqls.length == 1) { - final Statement stmt = conn.createUpdateStatement(); + stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(sqls[0]); - conn.offerUpdateStatement(stmt); } else { - final Statement stmt = conn.createUpdateStatement(); + stmt = conn.createUpdateStatement(); for (String sql : sqls) { stmt.addBatch(sql); } c = Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); } conn.commit(); + conn.offerUpdateStatement(stmt); slowLog(s, sqls); return c; } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException se) { - } + conn.rollback(stmt); if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { //单表结构不存在 @@ -851,18 +831,21 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (info.isLoggable(logger, Level.FINEST, sqls[0])) { logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls)); } + Statement stmt2 = null; try { - final Statement stmt = conn.createUpdateStatement(); + stmt2 = conn.createUpdateStatement(); for (String sql : sqls) { - stmt.addBatch(sql); + stmt2.addBatch(sql); } - int c = Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); + int c = Utility.sum(stmt2.executeBatch()); conn.commit(); + conn.offerUpdateStatement(stmt2); slowLog(s, sqls); return c; } catch (SQLException se) { throw new SourceException(se); + } finally { + conn.rollback(stmt2); } } else { throw new SourceException(e); @@ -889,7 +872,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override protected int createTableDB(EntityInfo info, String copyTableSql, Serializable pk, String... sqls) { SourceConnection conn = null; - Statement stmt; + Statement stmt = null; final long s = System.currentTimeMillis(); try { conn = writePool.pollConnection(); @@ -899,14 +882,12 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (sqls.length == 1) { stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(sqls[0]); - conn.offerUpdateStatement(stmt); } else { stmt = conn.createUpdateStatement(); for (String sql : sqls) { stmt.addBatch(sql); } c = Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); } } else { //建分表 try { @@ -914,6 +895,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { c = stmt.executeUpdate(copyTableSql); } catch (SQLException se) { if (isTableNotExist(info, se.getSQLState())) { //分表的原始表不存在 + conn.offerUpdateStatement(stmt); final String newTable = info.getTable(pk); if (newTable.indexOf('.') <= 0) { //分表的原始表不存在 if (info.isLoggable(logger, Level.FINEST, sqls[0])) { @@ -936,8 +918,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(copyTableSql); - conn.offerUpdateStatement(stmt); - } else { //需要先建库 String newCatalog = newTable.substring(0, newTable.indexOf('.')); String catalogSql = ("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog; @@ -958,8 +938,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(copyTableSql); - conn.offerUpdateStatement(stmt); } catch (SQLException sqle2) { + conn.offerUpdateStatement(stmt); if (isTableNotExist(info, sqle2.getSQLState())) { if (info.isLoggable(logger, Level.FINEST, sqls[0])) { logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls)); @@ -981,26 +961,21 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(copyTableSql); - conn.offerUpdateStatement(stmt); } else { throw new SourceException(sqle2); } } } } - throw new SourceException(se); + throw se; } } conn.commit(); + conn.offerUpdateStatement(stmt); slowLog(s, sqls); return c; } catch (SQLException e) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException se) { - } - } + conn.rollback(stmt); throw new SourceException(e); } finally { if (conn != null) { @@ -1013,30 +988,27 @@ public class DataJdbcSource extends AbstractDataSqlSource { protected int dropTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) { SourceConnection conn = null; final long s = System.currentTimeMillis(); + Statement stmt = null; try { conn = writePool.pollConnection(); conn.setAutoCommit(false); int c; if (sqls.length == 1) { - final Statement stmt = conn.createUpdateStatement(); + stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(sqls[0]); - conn.offerUpdateStatement(stmt); } else { - final Statement stmt = conn.createUpdateStatement(); + stmt = conn.createUpdateStatement(); for (String sql : sqls) { stmt.addBatch(sql); } c = Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); } conn.commit(); + conn.offerUpdateStatement(stmt); slowLog(s, sqls); return c; } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException se) { - } + conn.rollback(stmt); if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { //单表结构不存在 @@ -1069,13 +1041,13 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls)); } try { - final Statement stmt = conn.createUpdateStatement(); + final Statement stmt2 = conn.createUpdateStatement(); for (String sql : sqls) { - stmt.addBatch(sql); + stmt2.addBatch(sql); } - int c = Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); + int c = Utility.sum(stmt2.executeBatch()); conn.commit(); + conn.offerUpdateStatement(stmt2); slowLog(s, sqls); return c; } catch (SQLException se) { @@ -1101,22 +1073,16 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override protected int updateEntityDB(EntityInfo info, T... entitys) { SourceConnection conn = null; + List stmtsRef = new ArrayList<>(); try { conn = writePool.pollConnection(); conn.setAutoCommit(false); - BoolRef commitRef = new BoolRef(); - int c = updateEntityDBStatement(false, commitRef, conn, info, entitys); - if (!commitRef.get()) { - conn.commit(); - } + int c = updateEntityDBStatement(false, stmtsRef, conn, info, entitys); + conn.commit(); + conn.offerUpdateStatement(stmtsRef); return c; } catch (SQLException e) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException se) { - } - } + conn.rollback(stmtsRef); throw new SourceException(e); } finally { if (conn != null) { @@ -1125,7 +1091,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - private int updateEntityDBStatement(final boolean batch, BoolRef commitRef, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException { + private int updateEntityDBStatement(final boolean batch, List stmtsRef, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException { final long s = System.currentTimeMillis(); String presql = null; String caseSql = null; @@ -1163,7 +1129,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } c = c1; - conn.offerUpdateStatement(prestmt); } else { prepareInfos = getUpdateQuestionPrepareInfo(info, entitys); prestmts = prepareUpdateEntityStatements(conn, info, prepareInfos, entitys); @@ -1175,18 +1140,19 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } c = c1; - for (PreparedStatement stmt : prestmts) { - conn.offerUpdateStatement(stmt); - } } if (!batch) { conn.commit(); - commitRef.asTrue(); + conn.offerUpdateStatement(prestmt, prestmts); + } else { + if (prestmt != null) { + stmtsRef.add(prestmt); + } else { + stmtsRef.addAll(prestmts); + } } } catch (SQLException se) { - if (!batch) { - conn.rollback(); - } + conn.rollback(prestmt, prestmts); if (isTableNotExist(info, se.getSQLState())) { if (info.getTableStrategy() == null) { String[] tableSqls = createTableSqls(info); @@ -1201,6 +1167,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } + conn.commit(); conn.offerUpdateStatement(stmt); } catch (SQLException e2) { } @@ -1212,10 +1179,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (prepareInfos == null) { throw se; } - for (PreparedStatement stmt : prestmts) { - conn.offerUpdateStatement(stmt); - } - String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]); List notExistTables = checkNotExistTables(conn, oldTables); if (notExistTables.isEmpty()) { @@ -1236,10 +1199,12 @@ public class DataJdbcSource extends AbstractDataSqlSource { c1 += Utility.sum(stmt.executeBatch()); } c = c1; - for (PreparedStatement stmt : prestmts) { - conn.offerUpdateStatement(stmt); + if (!batch) { + conn.commit(); + conn.offerUpdateStatement(prestmts); + } else { + stmtsRef.addAll(prestmts); } - conn.commit(); } } else { throw se; @@ -1318,22 +1283,16 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override protected int updateColumnDB(EntityInfo info, Flipper flipper, UpdateSqlInfo sql) { SourceConnection conn = null; + List stmtsRef = new ArrayList<>(); try { conn = writePool.pollConnection(); conn.setAutoCommit(false); - BoolRef commitRef = new BoolRef(); - int c = updateColumnDBStatement(false, commitRef, conn, info, flipper, sql); - if (!commitRef.get()) { - conn.commit(); - } + int c = updateColumnDBStatement(false, stmtsRef, conn, info, flipper, sql); + conn.commit(); + conn.offerUpdateStatement(stmtsRef); return c; } catch (SQLException e) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException se) { - } - } + conn.rollback(stmtsRef); throw new SourceException(e); } finally { if (conn != null) { @@ -1342,14 +1301,17 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - private int updateColumnDBStatement(final boolean batch, BoolRef commitRef, final SourceConnection conn, final EntityInfo info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) { + private int updateColumnDBStatement(final boolean batch, List stmtsRef, final SourceConnection conn, final EntityInfo info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) { final long s = System.currentTimeMillis(); int c = -1; String firstTable = null; + Statement onestmt = null; + List prestmts = null; try { if (sql.blobs != null || sql.tables != null) { if (sql.tables == null) { - final PreparedStatement prestmt = conn.prepareUpdateStatement(sql.sql); + PreparedStatement prestmt = conn.prepareUpdateStatement(sql.sql); + onestmt = prestmt; int index = 0; for (byte[] param : sql.blobs) { Blob blob = conn.createBlob(); @@ -1360,30 +1322,31 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql.sql); } c = prestmt.executeUpdate(); - conn.offerUpdateStatement(prestmt); if (!batch) { conn.commit(); - commitRef.asTrue(); + conn.offerUpdateStatement(onestmt, prestmts); + } else if (onestmt != null) { + stmtsRef.add(onestmt); } slowLog(s, sql.sql); return c; } else { firstTable = sql.tables[0]; - List prestmts = new ArrayList<>(); + prestmts = new ArrayList<>(); String[] sqls = new String[sql.tables.length]; for (int i = 0; i < sql.tables.length; i++) { sqls[i] = i == 0 ? sql.sql : sql.sql.replaceFirst(firstTable, sql.tables[i]); - PreparedStatement prestmt = conn.prepareUpdateStatement(sqls[i]); + PreparedStatement stmt = conn.prepareUpdateStatement(sqls[i]); int index = 0; if (sql.blobs != null) { for (byte[] param : sql.blobs) { Blob blob = conn.createBlob(); blob.setBytes(1, param); - prestmt.setBlob(++index, blob); + stmt.setBlob(++index, blob); } } - prestmt.addBatch(); - prestmts.add(prestmt); + stmt.addBatch(); + prestmts.add(stmt); } if (info.isLoggable(logger, Level.FINEST, sql.sql)) { logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls)); @@ -1391,12 +1354,13 @@ public class DataJdbcSource extends AbstractDataSqlSource { int c1 = 0; for (PreparedStatement stmt : prestmts) { c1 += Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); } c = c1; if (!batch) { conn.commit(); - commitRef.asTrue(); + conn.offerUpdateStatement(onestmt, prestmts); + } else if (prestmts != null) { + stmtsRef.addAll(prestmts); } slowLog(s, sqls); } @@ -1405,36 +1369,37 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (info.isLoggable(logger, Level.FINEST, sql.sql)) { logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql.sql); } - final Statement stmt = conn.createUpdateStatement(); - c = stmt.executeUpdate(sql.sql); - conn.offerUpdateStatement(stmt); + onestmt = conn.createUpdateStatement(); + c = onestmt.executeUpdate(sql.sql); if (!batch) { conn.commit(); - commitRef.asTrue(); + conn.offerUpdateStatement(onestmt, prestmts); + } else if (onestmt != null) { + stmtsRef.add(onestmt); } slowLog(s, sql.sql); return c; } } catch (SQLException se) { - if (!batch) { - conn.rollback(); - } + conn.rollback(onestmt, prestmts); if (isTableNotExist(info, se.getSQLState())) { if (info.getTableStrategy() == null) { String[] tableSqls = createTableSqls(info); if (tableSqls != null) { try { - Statement stmt = conn.createUpdateStatement(); + onestmt = conn.createUpdateStatement(); if (tableSqls.length == 1) { - stmt.execute(tableSqls[0]); + onestmt.execute(tableSqls[0]); } else { for (String tableSql : tableSqls) { - stmt.addBatch(tableSql); + onestmt.addBatch(tableSql); } - stmt.executeBatch(); + onestmt.executeBatch(); } - conn.offerUpdateStatement(stmt); + conn.commit(); + conn.offerUpdateStatement(onestmt); } catch (SQLException e2) { + conn.rollback(onestmt); } } //表不存在,更新条数为0 @@ -1461,7 +1426,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (sql.tables.length == 0) { //分表全部不存在 return 0; } - List prestmts = new ArrayList<>(); + prestmts = new ArrayList<>(); String[] sqls = new String[sql.tables.length]; for (int i = 0; i < sql.tables.length; i++) { sqls[i] = sql.sql.replaceFirst(firstTable, sql.tables[i]); @@ -1483,12 +1448,13 @@ public class DataJdbcSource extends AbstractDataSqlSource { int c1 = 0; for (PreparedStatement stmt : prestmts) { c1 += Utility.sum(stmt.executeBatch()); - conn.offerUpdateStatement(stmt); } c = c1; if (!batch) { conn.commit(); - commitRef.asTrue(); + conn.offerUpdateStatement(onestmt, prestmts); + } else if (prestmts != null) { + stmtsRef.addAll(prestmts); } slowLog(s, sqls); return c; @@ -1512,6 +1478,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { Statement stmt = null; try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); stmt = conn.createQueryStatement(); ResultSet set = stmt.executeQuery(sql); if (set.next()) { @@ -1614,6 +1581,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { Statement stmt = null; try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); stmt = conn.createQueryStatement(); Number rs = defVal; ResultSet set = stmt.executeQuery(sql); @@ -1704,6 +1672,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { Statement stmt = null; try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); stmt = conn.createQueryStatement(); ResultSet set = stmt.executeQuery(sql); ResultSetMetaData rsd = set.getMetaData(); @@ -1791,6 +1760,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { Statement stmt = null; try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); stmt = conn.createQueryStatement(); ResultSet set = stmt.executeQuery(sql); ResultSetMetaData rsd = set.getMetaData(); @@ -1904,6 +1874,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { PreparedStatement prestmt = null; try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); String prepareSQL = info.getFindQuestionPrepareSQL(pk); prestmt = conn.prepareQueryStatement(prepareSQL); prestmt.setObject(1, pk); @@ -1937,6 +1908,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { PreparedStatement prestmt = null; try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); prestmt = conn.prepareQueryStatement(sql); prestmt.setFetchSize(1); final DataResultSet set = createDataResultSet(info, prestmt.executeQuery()); @@ -2017,6 +1989,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final Attribute attr = info.getAttribute(column); try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); prestmt = conn.prepareQueryStatement(sql); prestmt.setFetchSize(1); final DataResultSet set = createDataResultSet(info, prestmt.executeQuery()); @@ -2102,6 +2075,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { PreparedStatement prestmt = null; try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); prestmt = conn.prepareQueryStatement(sql); final ResultSet set = prestmt.executeQuery(); boolean rs = set.next() ? (set.getInt(1) > 0) : false; @@ -2182,6 +2156,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); final List list = new ArrayList(); try { String prepareSQL = info.getFindQuestionPrepareSQL(ids[0]); @@ -2236,6 +2211,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); final List list = new ArrayList(); try { String prepareSQL = info.getAllQueryPrepareSQL(); @@ -2282,6 +2258,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final boolean mysqlOrPgsql = "mysql".equals(dbtype()) || "postgresql".equals(dbtype()); try { conn = readPool.pollConnection(); + conn.setAutoCommit(true); String[] sqls = createSheetListAndCountSql(info, readCache, needTotal, distinct, selects, flipper, mysqlOrPgsql, tables, joinAndWhere); String listSql = sqls[0]; String countSql = sqls[1]; @@ -2520,24 +2497,22 @@ public class DataJdbcSource extends AbstractDataSqlSource { return new int[0]; } final long s = System.currentTimeMillis(); + Statement stmt = null; SourceConnection conn = writePool.pollConnection(); try { conn.setAutoCommit(false); - final Statement stmt = conn.createUpdateStatement(); + stmt = conn.createUpdateStatement(); final int[] rs = new int[sqls.length]; int i = -1; for (String sql : sqls) { rs[++i] = stmt.executeUpdate(sql); } - conn.offerUpdateStatement(stmt); conn.commit(); + conn.offerUpdateStatement(stmt); slowLog(s, sqls); return rs; } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException se) { - } + conn.rollback(stmt); throw new SourceException(e); } finally { if (conn != null) { @@ -2566,11 +2541,12 @@ public class DataJdbcSource extends AbstractDataSqlSource { NativeSqlStatement sinfo = super.nativeParse(sql, params); final long s = System.currentTimeMillis(); SourceConnection conn = writePool.pollConnection(); + Statement stmt = null; try { conn.setAutoCommit(false); int rs; if (sinfo.isEmptyNamed()) { - final Statement stmt = conn.createUpdateStatement(); + stmt = conn.createUpdateStatement(); rs = stmt.executeUpdate(sinfo.getNativeSql()); conn.offerUpdateStatement(stmt); } else { @@ -2581,16 +2557,14 @@ public class DataJdbcSource extends AbstractDataSqlSource { prestmt.setObject(++index, paramValues.get(n)); } rs = prestmt.executeUpdate(); - conn.offerUpdateStatement(prestmt); + stmt = prestmt; } conn.commit(); + conn.offerUpdateStatement(stmt); slowLog(s, sinfo.nativeSql); return rs; } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException se) { - } + conn.rollback(stmt); throw new SourceException(e); } finally { if (conn != null) { @@ -2615,6 +2589,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); final SourceConnection conn = readPool.pollConnection(); try { + conn.setAutoCommit(true); if (logger.isLoggable(Level.FINEST)) { logger.finest("executeQuery sql=" + sql); } @@ -2644,6 +2619,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); final SourceConnection conn = readPool.pollConnection(); try { + conn.setAutoCommit(true); if (logger.isLoggable(Level.FINEST)) { logger.finest("executeQuery sql=" + sinfo.getNativeSql()); } @@ -2689,6 +2665,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final long s = System.currentTimeMillis(); final SourceConnection conn = readPool.pollConnection(); try { + conn.setAutoCommit(true); if (logger.isLoggable(Level.FINEST)) { logger.finest("executeQuery sql=" + sinfo.nativeSql); } @@ -3164,7 +3141,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } public void offerStreamStatement(final Statement stmt) throws SQLException { - stmt.close(); + if (stmt != null) { + stmt.close(); + } } public Statement createQueryStatement() throws SQLException { @@ -3172,7 +3151,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } public void offerQueryStatement(final Statement stmt) throws SQLException { - stmt.close(); + if (stmt != null) { + stmt.close(); + } } public Statement createUpdateStatement() throws SQLException { @@ -3180,7 +3161,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } public void offerUpdateStatement(final Statement stmt) throws SQLException { - stmt.close(); + if (stmt != null) { + stmt.close(); + } } public PreparedStatement prepareQueryStatement(String sql) throws SQLException { @@ -3188,7 +3171,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } public void offerQueryStatement(final PreparedStatement stmt) throws SQLException { - stmt.close(); + if (stmt != null) { + stmt.close(); + } } public PreparedStatement prepareUpdateStatement(String sql) throws SQLException { @@ -3200,7 +3185,28 @@ public class DataJdbcSource extends AbstractDataSqlSource { } public void offerUpdateStatement(final PreparedStatement stmt) throws SQLException { - stmt.close(); + if (stmt != null) { + stmt.close(); + } + } + + public void offerUpdateStatement(List stmts) throws SQLException { + if (stmts != null) { + for (Statement s : stmts) { + s.close(); + } + } + } + + public void offerUpdateStatement(final Statement stmt, List stmts) throws SQLException { + if (stmt != null) { + stmt.close(); + } + if (stmts != null) { + for (Statement s : stmts) { + s.close(); + } + } } public void setAutoCommit(boolean autoCommit) throws SQLException { @@ -3211,8 +3217,44 @@ public class DataJdbcSource extends AbstractDataSqlSource { conn.commit(); } - public void rollback() throws SQLException { - conn.rollback(); + public void rollback(Statement stmt) { + try { + conn.rollback(); + if (stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + throw new SourceException(e); + } + } + + public void rollback(List stmts) { + try { + conn.rollback(); + if (stmts != null) { + for (Statement s : stmts) { + s.close(); + } + } + } catch (SQLException e) { + throw new SourceException(e); + } + } + + public void rollback(final Statement stmt, List stmts) { + try { + conn.rollback(); + if (stmt != null) { + stmt.close(); + } + if (stmts != null) { + for (Statement s : stmts) { + s.close(); + } + } + } catch (SQLException e) { + throw new SourceException(e); + } } public DatabaseMetaData getMetaData() throws SQLException {