DataJdbcSource优化

This commit is contained in:
redkale
2023-07-18 20:01:43 +08:00
parent 2a5184f4fc
commit 7dcaa3eda0

View File

@@ -385,16 +385,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
throw se;
}
//创建单表结构
Statement st = conn.createUpdateStatement();
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
stmt.addBatch(tableSql);
}
st.executeBatch();
stmt.executeBatch();
}
st.close();
stmt.close();
} else { //分库分表
info.disTableLock().lock();
try {
@@ -409,77 +409,77 @@ public class DataJdbcSource extends AbstractDataSqlSource {
});
try {
//执行一遍创建分表操作
Statement st = conn.createUpdateStatement();
Statement stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
st.addBatch(copySql);
stmt.addBatch(copySql);
}
st.executeBatch();
st.close();
stmt.executeBatch();
stmt.close();
} catch (SQLException sqle) { //多进程并发时可能会出现重复建表
if (isTableNotExist(info, sqle.getSQLState())) {
if (newCatalogs.isEmpty()) { //分表的原始表不存在
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
//创建原始表
Statement st = conn.createUpdateStatement();
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
stmt.addBatch(tableSql);
}
st.executeBatch();
stmt.executeBatch();
}
st.close();
stmt.close();
//再执行一遍创建分表操作
st = conn.createUpdateStatement();
stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
st.addBatch(copySql);
stmt.addBatch(copySql);
}
st.executeBatch();
st.close();
stmt.executeBatch();
stmt.close();
}
} else { //需要先建库
Statement st;
Statement stmt;
try {
st = conn.createUpdateStatement();
stmt = conn.createUpdateStatement();
for (String newCatalog : newCatalogs) {
st.addBatch(("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog);
stmt.addBatch(("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog);
}
st.executeBatch();
st.close();
stmt.executeBatch();
stmt.close();
} catch (SQLException sqle1) {
logger.log(Level.SEVERE, "create database " + tableCopys + " error", sqle1);
}
try {
//再执行一遍创建分表操作
st = conn.createUpdateStatement();
stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
st.addBatch(copySql);
stmt.addBatch(copySql);
}
st.executeBatch();
st.close();
stmt.executeBatch();
stmt.close();
} catch (SQLException sqle2) {
if (isTableNotExist(info, sqle2.getSQLState())) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) { //创建原始表
st = conn.createUpdateStatement();
stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
stmt.addBatch(tableSql);
}
st.executeBatch();
stmt.executeBatch();
}
st.close();
stmt.close();
//再执行一遍创建分表操作
st = conn.createUpdateStatement();
stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
st.addBatch(copySql);
stmt.addBatch(copySql);
}
st.executeBatch();
st.close();
stmt.executeBatch();
stmt.close();
}
} else {
logger.log(Level.SEVERE, "create table2 " + tableCopys + " error", sqle2);
@@ -651,16 +651,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
Statement st = conn.createUpdateStatement();
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
stmt.addBatch(tableSql);
}
st.executeBatch();
stmt.executeBatch();
}
st.close();
stmt.close();
return 0;
}
//单表结构不存在
@@ -1148,16 +1148,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createUpdateStatement();
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
stmt.addBatch(tableSql);
}
st.executeBatch();
stmt.executeBatch();
}
st.close();
stmt.close();
} catch (SQLException e2) {
}
}
@@ -1380,16 +1380,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createUpdateStatement();
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
stmt.addBatch(tableSql);
}
st.executeBatch();
stmt.executeBatch();
}
st.close();
stmt.close();
} catch (SQLException e2) {
}
}
@@ -2530,12 +2530,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("direct query sql=" + sql);
}
final Statement statement = conn.createQueryStatement();
final Statement stmt = conn.createQueryStatement();
//final PreparedStatement prestmt = conn.prepareStatement(sql);
final ResultSet set = statement.executeQuery(sql);// prestmt.executeQuery();
final ResultSet set = stmt.executeQuery(sql);// prestmt.executeQuery();
V rs = handler.apply(createDataResultSet(null, set));
set.close();
statement.close();
stmt.close();
slowLog(s, sql);
return rs;
} catch (Exception ex) {