From 3c450936324ec9fc835f1de52e116bbb71bcc2f4 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 24 Jul 2023 22:51:35 +0800 Subject: [PATCH] =?UTF-8?q?DataJdbcSource=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redkale/source/AbstractDataSource.java | 15 ++ .../java/org/redkale/source/DataBatch.java | 4 +- .../org/redkale/source/DataJdbcSource.java | 138 ++++++------------ 3 files changed, 59 insertions(+), 98 deletions(-) diff --git a/src/main/java/org/redkale/source/AbstractDataSource.java b/src/main/java/org/redkale/source/AbstractDataSource.java index 7b35b047a..7a4d75fce 100644 --- a/src/main/java/org/redkale/source/AbstractDataSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSource.java @@ -1276,6 +1276,12 @@ public abstract class AbstractDataSource extends AbstractService implements Data protected DefaultDataBatch() { } + public DataBatch run(Runnable task) { + Objects.requireNonNull(task); + this.actions.add(new RunnableBatchAction(task)); + return this; + } + @Override //entitys不一定是同一表的数据 public DataBatch insert(T... entitys) { for (T t : entitys) { @@ -1429,6 +1435,15 @@ public abstract class AbstractDataSource extends AbstractService implements Data } + protected static class RunnableBatchAction extends BatchAction { + + public Runnable task; + + public RunnableBatchAction(Runnable task) { + this.task = task; + } + } + protected static class InsertBatchAction1 extends BatchAction { public Object entity; diff --git a/src/main/java/org/redkale/source/DataBatch.java b/src/main/java/org/redkale/source/DataBatch.java index fbe161cd1..8cf6ce18f 100644 --- a/src/main/java/org/redkale/source/DataBatch.java +++ b/src/main/java/org/redkale/source/DataBatch.java @@ -4,7 +4,7 @@ package org.redkale.source; import java.io.Serializable; -import org.redkale.util.*; +import org.redkale.util.SelectColumn; /** * DataSource批量操作对象,操作类型只能是增删改
@@ -22,6 +22,8 @@ public interface DataBatch { return new AbstractDataSource.DefaultDataBatch(); } + public DataBatch run(Runnable task); + public DataBatch insert(T... entitys); public DataBatch delete(T... entitys); diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 5f7a13bc4..65c9cacd3 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -218,11 +218,16 @@ public class DataJdbcSource extends AbstractDataSqlSource { SourceConnection conn = null; try { conn = writePool.pollTransConnection(); + conn.setAutoCommit(false); for (BatchAction action : dataBatch.actions) { - if (action instanceof InsertBatchAction1) { + if (action instanceof RunnableBatchAction) { + RunnableBatchAction act = (RunnableBatchAction) action; + act.task.run(); + + } else if (action instanceof InsertBatchAction1) { InsertBatchAction1 act = (InsertBatchAction1) action; EntityInfo info = apply(act.entity.getClass()); - c += insertDB(true, conn, info, act.entity); + c += insertDBStatement(true, conn, info, act.entity); } else if (action instanceof DeleteBatchAction1) { DeleteBatchAction1 act = (DeleteBatchAction1) action; @@ -231,7 +236,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { Map> pkmap = info.getTableMap(pk); String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]); String[] sqls = deleteSql(info, pkmap); - c += deleteDB(true, conn, info, tables, null, null, pkmap, sqls); + c += deleteDBStatement(true, conn, info, tables, null, null, pkmap, sqls); } else if (action instanceof DeleteBatchAction2) { DeleteBatchAction2 act = (DeleteBatchAction2) action; @@ -239,37 +244,37 @@ public class DataJdbcSource extends AbstractDataSqlSource { Map> pkmap = info.getTableMap(act.pk); String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]); String[] sqls = deleteSql(info, pkmap); - c += deleteDB(true, conn, info, tables, null, null, pkmap, sqls); + c += deleteDBStatement(true, conn, info, tables, null, null, pkmap, sqls); } else if (action instanceof DeleteBatchAction3) { DeleteBatchAction3 act = (DeleteBatchAction3) action; EntityInfo info = apply(act.clazz); String[] tables = info.getTables(act.node); String[] sqls = deleteSql(info, tables, act.flipper, act.node); - c += deleteDB(true, conn, info, tables, act.flipper, act.node, null, sqls); + c += deleteDBStatement(true, conn, info, tables, act.flipper, act.node, null, sqls); } else if (action instanceof UpdateBatchAction1) { UpdateBatchAction1 act = (UpdateBatchAction1) action; EntityInfo info = apply(act.entity.getClass()); - c += updateEntityDB(true, conn, info, act.entity); + c += updateEntityDBStatement(true, conn, info, act.entity); } else if (action instanceof UpdateBatchAction2) { UpdateBatchAction2 act = (UpdateBatchAction2) action; EntityInfo info = apply(act.clazz); UpdateSqlInfo sql = updateColumnSql(info, act.pk, act.values); - c += updateColumnDB(true, conn, info, null, sql); + c += updateColumnDBStatement(true, conn, info, null, sql); } else if (action instanceof UpdateBatchAction3) { UpdateBatchAction3 act = (UpdateBatchAction3) action; EntityInfo info = apply(act.clazz); UpdateSqlInfo sql = updateColumnSql(info, act.node, act.flipper, act.values); - c += updateColumnDB(true, conn, info, act.flipper, sql); + c += updateColumnDBStatement(true, conn, info, act.flipper, sql); } else if (action instanceof UpdateBatchAction4) { UpdateBatchAction4 act = (UpdateBatchAction4) action; EntityInfo info = apply(act.entity.getClass()); UpdateSqlInfo sql = updateColumnSql(info, false, act.entity, act.node, act.selects); - c += updateColumnDB(true, conn, info, null, sql); + c += updateColumnDBStatement(true, conn, info, null, sql); } } conn.commit(); @@ -313,7 +318,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { try { conn = writePool.pollConnection(); conn.setAutoCommit(false); - int c = insertDB(false, conn, info, entitys); + int c = insertDBStatement(false, conn, info, entitys); conn.commit(); return c; } catch (SQLException e) { @@ -331,7 +336,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - private int insertDB(final boolean batch, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException { + private int insertDBStatement(final boolean batch, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException { final long s = System.currentTimeMillis(); int c = 0; String presql = null; @@ -347,21 +352,13 @@ public class DataJdbcSource extends AbstractDataSqlSource { prestmts = prepareInsertEntityStatements(conn, info, prepareInfos, entitys); } try { - if (info.getTableStrategy() == null) { //单库单表 - int c1 = 0; - int[] cs = prestmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } - c = c1; + if (info.getTableStrategy() == null) { //单库单表 + c = Utility.sum(prestmt.executeBatch()); conn.offerUpdateStatement(prestmt); } else { //分库分表 int c1 = 0; for (PreparedStatement stmt : prestmts) { - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } + c1 += Utility.sum(stmt.executeBatch()); } c = c1; for (PreparedStatement stmt : prestmts) { @@ -494,12 +491,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (info.getTableStrategy() == null) { //单库单表 conn.offerUpdateStatement(prestmt); prestmt = prepareInsertEntityStatement(conn, presql, info, entitys); - int c1 = 0; - int[] cs = prestmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } - c = c1; + c = Utility.sum(prestmt.executeBatch());; conn.offerUpdateStatement(prestmt); } else { //分库分表 for (PreparedStatement stmt : prestmts) { @@ -508,10 +500,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { prestmts = prepareInsertEntityStatements(conn, info, prepareInfos, entitys); int c1 = 0; for (PreparedStatement stmt : prestmts) { - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } + c1 += Utility.sum(stmt.executeBatch()); } c = c1; for (PreparedStatement stmt : prestmts) { @@ -594,7 +583,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { try { conn = writePool.pollConnection(); conn.setAutoCommit(false); - int c = deleteDB(false, conn, info, tables, flipper, node, pkmap, sqls); + int c = deleteDBStatement(false, conn, info, tables, flipper, node, pkmap, sqls); conn.commit(); return c; } catch (SQLException e) { @@ -612,27 +601,21 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - private int deleteDB(final boolean batch, final SourceConnection conn, final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) throws SQLException { + private int deleteDBStatement(final boolean batch, final SourceConnection conn, final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) throws SQLException { final long s = System.currentTimeMillis(); try { int c; if (sqls.length == 1) { final Statement stmt = conn.createUpdateStatement(); - int c1 = stmt.executeUpdate(sqls[0]); + c = stmt.executeUpdate(sqls[0]); conn.offerUpdateStatement(stmt); - c = c1; } else { final Statement stmt = conn.createUpdateStatement(); for (String sql : sqls) { stmt.addBatch(sql); } - int c1 = 0; - int[] cs = stmt.executeBatch(); + c = Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); - for (int cc : cs) { - c1 += cc; - } - c = c1; } if (!batch) { conn.commit(); @@ -700,12 +683,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { for (String sql : sqls) { stmt.addBatch(sql); } - int c = 0; - int[] cs = stmt.executeBatch(); + int c = Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); - for (int cc : cs) { - c += cc; - } conn.commit(); slowLog(s, sqls); return c; @@ -735,21 +714,15 @@ public class DataJdbcSource extends AbstractDataSqlSource { int c; if (sqls.length == 1) { final Statement stmt = conn.createUpdateStatement(); - int c1 = stmt.executeUpdate(sqls[0]); + c = stmt.executeUpdate(sqls[0]); conn.offerUpdateStatement(stmt); - c = c1; } else { final Statement stmt = conn.createUpdateStatement(); for (String sql : sqls) { stmt.addBatch(sql); } - int c1 = 0; - int[] cs = stmt.executeBatch(); + c = Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); - for (int cc : cs) { - c1 += cc; - } - c = c1; } conn.commit(); slowLog(s, sqls); @@ -795,12 +768,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { for (String sql : sqls) { stmt.addBatch(sql); } - int c = 0; - int[] cs = stmt.executeBatch(); + int c = Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); - for (int cc : cs) { - c += cc; - } conn.commit(); slowLog(s, sqls); return c; @@ -841,21 +810,15 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (copyTableSql == null) { if (sqls.length == 1) { stmt = conn.createUpdateStatement(); - int c1 = stmt.executeUpdate(sqls[0]); + c = stmt.executeUpdate(sqls[0]); conn.offerUpdateStatement(stmt); - c = c1; } else { stmt = conn.createUpdateStatement(); for (String sql : sqls) { stmt.addBatch(sql); } - int c1 = 0; - int[] cs = stmt.executeBatch(); + c = Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); - for (int cc : cs) { - c1 += cc; - } - c = c1; } } else { //建分表 try { @@ -968,21 +931,15 @@ public class DataJdbcSource extends AbstractDataSqlSource { int c; if (sqls.length == 1) { final Statement stmt = conn.createUpdateStatement(); - int c1 = stmt.executeUpdate(sqls[0]); + c = stmt.executeUpdate(sqls[0]); conn.offerUpdateStatement(stmt); - c = c1; } else { final Statement stmt = conn.createUpdateStatement(); for (String sql : sqls) { stmt.addBatch(sql); } - int c1 = 0; - int[] cs = stmt.executeBatch(); + c = Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); - for (int cc : cs) { - c1 += cc; - } - c = c1; } conn.commit(); slowLog(s, sqls); @@ -1028,12 +985,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { for (String sql : sqls) { stmt.addBatch(sql); } - int c = 0; - int[] cs = stmt.executeBatch(); + int c = Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); - for (int cc : cs) { - c += cc; - } conn.commit(); slowLog(s, sqls); return c; @@ -1063,7 +1016,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { try { conn = writePool.pollConnection(); conn.setAutoCommit(false); - int c = updateEntityDB(false, conn, info, entitys); + int c = updateEntityDBStatement(false, conn, info, entitys); conn.commit(); return c; } catch (SQLException e) { @@ -1081,7 +1034,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - private int updateEntityDB(final boolean batch, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException { + private int updateEntityDBStatement(final boolean batch, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException { final long s = System.currentTimeMillis(); String presql = null; String caseSql = null; @@ -1188,10 +1141,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { prestmts = prepareUpdateEntityStatements(conn, info, prepareInfos, entitys); int c1 = 0; for (PreparedStatement stmt : prestmts) { - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } + c1 += Utility.sum(stmt.executeBatch()); } c = c1; for (PreparedStatement stmt : prestmts) { @@ -1279,7 +1229,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { try { conn = writePool.pollConnection(); conn.setAutoCommit(false); - int c = updateColumnDB(false, conn, info, flipper, sql); + int c = updateColumnDBStatement(false, conn, info, flipper, sql); conn.commit(); return c; } catch (SQLException e) { @@ -1297,7 +1247,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - private int updateColumnDB(final boolean batch, final SourceConnection conn, final EntityInfo info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) { + private int updateColumnDBStatement(final boolean batch, final SourceConnection conn, final EntityInfo info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) { final long s = System.currentTimeMillis(); int c = -1; String firstTable = null; @@ -1344,10 +1294,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c1 = 0; for (PreparedStatement stmt : prestmts) { - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } + c1 += Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); } c = c1; @@ -1437,10 +1384,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c1 = 0; for (PreparedStatement stmt : prestmts) { - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c1 += cc; - } + c1 += Utility.sum(stmt.executeBatch()); conn.offerUpdateStatement(stmt); } c = c1;