优化DataJdbcSource

This commit is contained in:
Redkale
2022-12-28 14:30:24 +08:00
parent 2af90d790b
commit 65f7692116
3 changed files with 520 additions and 354 deletions

View File

@@ -72,14 +72,15 @@ public class LoggingFileHandler extends LoggingBaseHandler {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
final PrintStream ps = new PrintStream(out);
ps.println("handlers = java.util.logging.ConsoleHandler");
final String handlerName = LoggingConsoleHandler.class.getName(); //java.util.logging.ConsoleHandler
ps.println("handlers = " + handlerName);
ps.println(".level = FINEST");
ps.println("jdk.level = INFO");
ps.println("sun.level = INFO");
ps.println("com.sun.level = INFO");
ps.println("javax.level = INFO");
ps.println("java.util.logging.ConsoleHandler.level = FINEST");
ps.println("java.util.logging.ConsoleHandler.formatter = " + LoggingFormater.class.getName());
ps.println(handlerName + ".level = FINEST");
ps.println(handlerName + ".formatter = " + LoggingFormater.class.getName());
LogManager.getLogManager().readConfiguration(new ByteArrayInputStream(out.toByteArray()));
} catch (Exception e) {
}

View File

@@ -118,6 +118,233 @@ public class DataJdbcSource extends DataSqlSource {
return CompletableFuture.supplyAsync(() -> batch(batch), getExecutor());
}
protected <T> List<PreparedStatement> createInsertPreparedStatements(final Connection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final List<PreparedStatement> prestmts = new ArrayList<>();
for (Map.Entry<String, PrepareInfo<T>> en : prepareInfos.entrySet()) {
PrepareInfo<T> prepareInfo = en.getValue();
PreparedStatement prestmt = conn.prepareStatement(prepareInfo.prepareSql);
for (final T value : prepareInfo.entitys) {
batchStatementParameters(conn, prestmt, info, attrs, value);
prestmt.addBatch();
}
prestmts.add(prestmt);
}
return prestmts;
}
protected <T> PreparedStatement createInsertPreparedStatement(Connection conn, String sql, EntityInfo<T> info, T... entitys) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final PreparedStatement prestmt = conn.prepareStatement(sql);
for (final T value : entitys) {
batchStatementParameters(conn, prestmt, info, attrs, value);
prestmt.addBatch();
}
return prestmt;
}
protected <T> List<PreparedStatement> createUpdatePreparedStatements(final Connection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
Attribute<T, Serializable> primary = info.primary;
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final List<PreparedStatement> prestmts = new ArrayList<>();
for (Map.Entry<String, PrepareInfo<T>> en : prepareInfos.entrySet()) {
PrepareInfo<T> prepareInfo = en.getValue();
PreparedStatement prestmt = conn.prepareStatement(prepareInfo.prepareSql);
for (final T value : prepareInfo.entitys) {
int k = batchStatementParameters(conn, prestmt, info, attrs, value);
prestmt.setObject(++k, primary.get(value));
prestmt.addBatch();
}
prestmts.add(prestmt);
}
return prestmts;
}
protected <T> PreparedStatement createUpdatePreparedStatement(Connection conn, String sql, EntityInfo<T> info, T... entitys) throws SQLException {
Attribute<T, Serializable> primary = info.primary;
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final PreparedStatement prestmt = conn.prepareStatement(sql);
for (final T value : entitys) {
int k = batchStatementParameters(conn, prestmt, info, attrs, value);
prestmt.setObject(++k, primary.get(value));
prestmt.addBatch();
}
return prestmt;
}
protected <T> int batchStatementParameters(Connection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
Object val = getEntityAttrValue(info, attr, entity);
if (val instanceof byte[]) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++i, blob);
} else if (val instanceof Boolean) {
prestmt.setObject(++i, ((Boolean) val) ? (byte) 1 : (byte) 0);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++i, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++i, ((AtomicLong) val).get());
} else {
prestmt.setObject(++i, val);
}
}
return i;
}
@Override
protected <T> CompletableFuture<Integer> deleteDB(EntityInfo<T> info, Flipper flipper, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(true);
int c = 0;
if (sqls.length == 1) {
String sql = sqls[0];
sql += ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
}
final Statement stmt = conn.createStatement();
c = stmt.executeUpdate(sql);
stmt.close();
} else {
if (flipper == null || flipper.getLimit() < 1) {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sqls=" + Arrays.toString(sqls));
}
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
} else {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " limit " + flipper.getLimit() + " delete sqls=" + Arrays.toString(sqls));
}
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql + " LIMIT " + flipper.getLimit());
}
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
}
}
slowLog(s, sqls);
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
st.close();
return CompletableFuture.completedFuture(0);
} catch (SQLException e2) {
return CompletableFuture.failedFuture(e2);
}
}
} else {
return CompletableFuture.failedFuture(e);
}
}
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Integer> clearTableDB(EntityInfo<T> info, final String[] tables, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(true);
int c = 0;
final Statement stmt = conn.createStatement();
if (sqls.length == 1) {
String sql = sqls[0];
c = stmt.executeUpdate(sql);
} else {
for (String sql : sqls) {
stmt.addBatch(sql);
}
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c += cc;
}
}
stmt.close();
slowLog(s, sqls);
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) return CompletableFuture.completedFuture(-1);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Integer> dropTableDB(EntityInfo<T> info, String[] tables, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(true);
int c = 0;
final Statement stmt = conn.createStatement();
if (sqls.length == 1) {
String sql = sqls[0];
c = stmt.executeUpdate(sql);
} else {
for (String sql : sqls) {
stmt.addBatch(sql);
}
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c += cc;
}
}
stmt.close();
if (info.getTableStrategy() != null) {
for (String table : tables) {
String tablekey = table.indexOf('.') > 0 ? table : (conn.getCatalog() + '.' + table);
info.removeDisTable(tablekey);
}
}
slowLog(s, sqls);
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) return CompletableFuture.completedFuture(-1);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
@Override
public int batch(final DataBatch batch) {
Objects.requireNonNull(batch);
@@ -159,6 +386,7 @@ public class DataJdbcSource extends DataSqlSource {
c1 += cc;
}
c = c1;
prestmt.close();
} else {
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
@@ -168,6 +396,9 @@ public class DataJdbcSource extends DataSqlSource {
}
}
c = c1;
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
}
conn.commit();
} catch (SQLException se) {
@@ -253,13 +484,13 @@ public class DataJdbcSource extends DataSqlSource {
st.close();
} catch (SQLException sqle2) {
if (isTableNotExist(info, sqle2.getSQLState())) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) { //创建原始表
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) { //创建原始表
st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tableSql : tablesqls) {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
@@ -370,207 +601,12 @@ public class DataJdbcSource extends DataSqlSource {
}
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
}
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
protected <T> List<PreparedStatement> createInsertPreparedStatements(final Connection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final List<PreparedStatement> prestmts = new ArrayList<>();
for (Map.Entry<String, PrepareInfo<T>> en : prepareInfos.entrySet()) {
PrepareInfo<T> prepareInfo = en.getValue();
PreparedStatement prestmt = conn.prepareStatement(prepareInfo.prepareSql);
for (final T value : prepareInfo.entitys) {
batchStatementParameters(conn, prestmt, info, attrs, value);
prestmt.addBatch();
}
prestmts.add(prestmt);
}
return prestmts;
}
protected <T> PreparedStatement createInsertPreparedStatement(Connection conn, String sql, EntityInfo<T> info, T... entitys) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final PreparedStatement prestmt = conn.prepareStatement(sql);
for (final T value : entitys) {
batchStatementParameters(conn, prestmt, info, attrs, value);
prestmt.addBatch();
}
return prestmt;
}
protected <T> int batchStatementParameters(Connection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
Object val = getEntityAttrValue(info, attr, entity);
if (val instanceof byte[]) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++i, blob);
} else if (val instanceof Boolean) {
prestmt.setObject(++i, ((Boolean) val) ? (byte) 1 : (byte) 0);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++i, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++i, ((AtomicLong) val).get());
} else {
prestmt.setObject(++i, val);
}
}
return i;
}
@Override
protected <T> CompletableFuture<Integer> deleteDB(EntityInfo<T> info, Flipper flipper, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(true);
int c = 0;
if (sqls.length == 1) {
String sql = sqls[0];
sql += ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
}
final Statement stmt = conn.createStatement();
c = stmt.executeUpdate(sql);
stmt.close();
} else {
if (flipper == null || flipper.getLimit() < 1) {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sqls=" + Arrays.toString(sqls));
}
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
} else {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " limit " + flipper.getLimit() + " delete sqls=" + Arrays.toString(sqls));
}
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql + " LIMIT " + flipper.getLimit());
}
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
slowLog(s, sqls);
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
}
st.executeBatch();
}
st.close();
return CompletableFuture.completedFuture(0);
} catch (SQLException e2) {
return CompletableFuture.failedFuture(e2);
}
}
}
}
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Integer> clearTableDB(EntityInfo<T> info, final String[] tables, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(true);
int c = 0;
final Statement stmt = conn.createStatement();
if (sqls.length == 1) {
String sql = sqls[0];
c = stmt.executeUpdate(sql);
} else {
for (String sql : sqls) {
stmt.addBatch(sql);
}
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c += cc;
}
}
stmt.close();
slowLog(s, sqls);
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) return CompletableFuture.completedFuture(-1);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Integer> dropTableDB(EntityInfo<T> info, String[] tables, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(true);
int c = 0;
final Statement stmt = conn.createStatement();
if (sqls.length == 1) {
String sql = sqls[0];
c = stmt.executeUpdate(sql);
} else {
for (String sql : sqls) {
stmt.addBatch(sql);
}
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c += cc;
}
}
stmt.close();
if (info.getTableStrategy() != null) {
for (String table : tables) {
String tablekey = table.indexOf('.') > 0 ? table : (conn.getCatalog() + '.' + table);
info.removeDisTable(tablekey);
}
}
slowLog(s, sqls);
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) return CompletableFuture.completedFuture(-1);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
@@ -581,69 +617,173 @@ public class DataJdbcSource extends DataSqlSource {
protected <T> CompletableFuture<Integer> updateEntityDB(EntityInfo<T> info, T... entitys) {
Connection conn = null;
final long s = System.currentTimeMillis();
String presql = null;
PreparedStatement prestmt = null;
List<PreparedStatement> prestmts = null;
Map<String, PrepareInfo<T>> prepareInfos = null;
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(true);
final String updateSQL = info.getUpdateQuestionPrepareSQL(entitys[0]);
final PreparedStatement prestmt = conn.prepareStatement(updateSQL);
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final boolean debugfinest = info.isLoggable(logger, Level.FINEST);
char[] sqlchars = debugfinest ? updateSQL.toCharArray() : null;
final Attribute<T, Serializable> primary = info.getPrimary();
for (final T value : entitys) {
int k = batchStatementParameters(conn, prestmt, info, attrs, value);
prestmt.setObject(++k, primary.get(value));
prestmt.addBatch();//------------------------------------------------------------
if (debugfinest) { //打印调试信息
//-----------------------------
int i = 0;
StringBuilder sb = new StringBuilder(128);
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(info.formatSQLValue(obj, sqlFormatter));
conn.setAutoCommit(false);
int c = 0;
final Attribute<T, Serializable>[] attrs = info.updateAttributes;
AGAIN:
while (true) {
try {
if (info.getTableStrategy() == null) {
presql = info.getUpdateQuestionPrepareSQL(entitys[0]);
prestmt = createUpdatePreparedStatement(conn, presql, info, entitys);
int c1 = 0;
int[] pc = prestmt.executeBatch();
for (int p : pc) {
if (p >= 0) c1 += p;
}
c = c1;
prestmt.close();
} else {
if (prepareInfos == null) {
prepareInfos = getUpdateQuestionPrepareInfo(info, entitys);
}
prestmts = createUpdatePreparedStatements(conn, info, prepareInfos, entitys);
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c1 += cc;
}
} else {
sb.append(ch);
}
c = c1;
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
}
String debugsql = sb.toString();
if (info.isLoggable(logger, Level.FINEST, debugsql)) logger.finest(info.getType().getSimpleName() + " updates sql=" + debugsql.replaceAll("(\r|\n)", "\\n"));
} //打印结束
conn.commit();
break;
} catch (SQLException se) {
conn.rollback();
if (isTableNotExist(info, se.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
st.close();
} catch (SQLException e2) {
}
}
//表都不存在更新条数为0
return CompletableFuture.completedFuture(0);
} else {
String tableName = parseNotExistTableName(se);
if (tableName == null || prepareInfos == null) {
return CompletableFuture.failedFuture(se);
}
String minTableName = (tableName.indexOf('.') > 0) ? tableName.substring(tableName.indexOf('.') + 1) : null;
for (String t : prepareInfos.keySet()) {
if (t.equals(tableName)) {
prepareInfos.remove(t);
if (info.getTableStrategy() == null) {
prestmt.close();
} else {
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
}
continue AGAIN;
} else if (minTableName != null && t.equals(minTableName)) {
prepareInfos.remove(t);
if (info.getTableStrategy() == null) {
prestmt.close();
} else {
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
}
continue AGAIN;
}
}
return CompletableFuture.failedFuture(se);
}
}
throw se;
}
}
int[] pc = prestmt.executeBatch();
int c = 0;
for (int p : pc) {
if (p >= 0) c += p;
if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息
Attribute<T, Serializable> primary = info.getPrimary();
if (info.getTableStrategy() == null) {
char[] sqlchars = presql.toCharArray();
for (final T value : entitys) {
//-----------------------------
StringBuilder sb = new StringBuilder(128);
int i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(info.formatSQLValue(obj, sqlFormatter));
}
} else {
sb.append(ch);
}
}
String debugsql = sb.toString();
if (info.isLoggable(logger, Level.FINEST, debugsql)) logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n"));
}
} else {
prepareInfos.forEach((t, p) -> {
char[] sqlchars = p.prepareSql.toCharArray();
for (final T value : p.entitys) {
//-----------------------------
StringBuilder sb = new StringBuilder(128);
int i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(info.formatSQLValue(obj, sqlFormatter));
}
} else {
sb.append(ch);
}
}
String debugsql = sb.toString();
if (info.isLoggable(logger, Level.FINEST, debugsql)) logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n"));
}
});
}
} //打印结束
if (info.getTableStrategy() == null) {
slowLog(s, presql);
} else {
List<String> presqls = new ArrayList<>();
prepareInfos.forEach((t, p) -> {
presqls.add(p.prepareSql);
});
slowLog(s, presqls.toArray(new String[presqls.size()]));
}
prestmt.close();
slowLog(s, updateSQL);
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
}
st.executeBatch();
}
st.close();
} catch (SQLException e2) {
}
}
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
return CompletableFuture.completedFuture(0);
}
return CompletableFuture.failedFuture(e);
} finally {
@@ -702,15 +842,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -718,8 +858,10 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e2) {
}
}
return CompletableFuture.completedFuture(0);
} else {
return CompletableFuture.failedFuture(e);
}
return CompletableFuture.completedFuture(0);
}
return CompletableFuture.failedFuture(e);
} finally {
@@ -755,15 +897,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -801,15 +943,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -848,15 +990,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -910,15 +1052,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -953,15 +1095,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -1000,15 +1142,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -1043,15 +1185,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -1180,15 +1322,15 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tablesqls = createTableSqls(info);
if (tablesqls != null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tablesqls.length == 1) {
st.execute(tablesqls[0]);
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tablesql : tablesqls) {
st.addBatch(tablesql);
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
@@ -1196,8 +1338,10 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e2) {
}
}
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
} else {
return CompletableFuture.failedFuture(e);
}
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
}
return CompletableFuture.failedFuture(e);
} finally {

View File

@@ -7,6 +7,7 @@ package org.redkale.source;
import java.io.Serializable;
import java.math.*;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -64,8 +65,8 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
protected BiFunction<EntityInfo, Object, CharSequence> sqlFormatter;
protected BiConsumer futureCompleteConsumer = (r, t) -> {
if (t != null) logger.log(Level.INFO, "CompletableFuture complete error", (Throwable) t);
protected BiConsumer errorCompleteConsumer = (r, t) -> {
//if (t != null) logger.log(Level.INFO, "CompletableFuture complete error", (Throwable) t);
};
protected final BiFunction<DataSource, EntityInfo, CompletableFuture<List>> fullloader = (s, i)
@@ -229,6 +230,24 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
}
protected String parseNotExistTableName(SQLException e) {
String errmsg = e.getMessage();
char quote = '"';
String tableName = null;
int pos = errmsg.indexOf(quote);
if (pos < 0) {
quote = '\'';
pos = errmsg.indexOf(quote);
}
if (pos >= 0) {
int pos2 = errmsg.indexOf(quote, pos + 1);
if (pos2 > pos) {
tableName = errmsg.substring(pos + 1, pos2);
}
}
return tableName;
}
//解密可能存在的加密字段, 可重载
protected String decryptProperty(String key, String value) {
return value;
@@ -725,7 +744,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return insertCache(info, entitys);
return insertDB(info, entitys).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
insertCache(info, entitys);
}
@@ -743,14 +762,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return insertDB(info, entitys).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
insertCache(info, entitys);
}
});
return CompletableFuture.supplyAsync(() -> insertDB(info, entitys).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
insertCache(info, entitys);
}
@@ -814,7 +833,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return deleteCache(info, -1, pks);
return deleteCompose(info, pks).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, pks);
}
@@ -830,14 +849,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return deleteCompose(info, pks).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, pks);
}
});
return CompletableFuture.supplyAsync(() -> deleteCompose(info, pks).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, pks);
}
@@ -860,7 +879,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return deleteCache(info, -1, flipper, node);
return this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, flipper, node);
}
@@ -875,14 +894,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, flipper, node);
}
});
return CompletableFuture.supplyAsync(() -> this.deleteCompose(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, flipper, node);
}
@@ -968,7 +987,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return clearTableCache(info, node);
return this.clearTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
clearTableCache(info, node);
}
@@ -983,14 +1002,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return this.clearTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
clearTableCache(info, node);
}
});
return CompletableFuture.supplyAsync(() -> this.clearTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
clearTableCache(info, node);
}
@@ -1024,7 +1043,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return dropTableCache(info, node);
return this.dropTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
dropTableCache(info, node);
}
@@ -1039,14 +1058,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return this.dropTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
dropTableCache(info, node);
}
});
return CompletableFuture.supplyAsync(() -> this.dropTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
dropTableCache(info, node);
}
@@ -1129,10 +1148,12 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
checkEntity("update", false, entitys);
final Class<T> clazz = (Class<T>) entitys[0].getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, entitys);
if (isOnlyCache(info)) {
return updateCache(info, -1, entitys);
}
return updateEntityDB(info, entitys).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, entitys);
}
@@ -1151,14 +1172,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return updateEntityDB(info, entitys).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, entitys);
}
});
return CompletableFuture.supplyAsync(() -> updateEntityDB(info, entitys).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, entitys);
}
@@ -1182,7 +1203,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return updateCache(info, -1, pk, column, colval);
return updateColumnCompose(info, pk, column, colval).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, pk, column, colval);
}
@@ -1197,14 +1218,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return updateColumnCompose(info, pk, column, colval).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, pk, column, colval);
}
});
return CompletableFuture.supplyAsync(() -> updateColumnCompose(info, pk, column, colval).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, pk, column, colval);
}
@@ -1245,7 +1266,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return updateCache(info, -1, column, colval, node);
return this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, column, colval, node);
}
@@ -1260,14 +1281,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, column, colval, node);
}
});
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, column, colval, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, column, colval, node);
}
@@ -1343,7 +1364,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return updateCache(info, -1, pk, values);
return this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, pk, values);
}
@@ -1359,14 +1380,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, pk, values);
}
});
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, pk, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, pk, values);
}
@@ -1408,7 +1429,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return updateCache(info, -1, node, flipper, values);
return this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, node, flipper, values);
}
@@ -1424,14 +1445,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, node, flipper, values);
}
});
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, node, flipper, values);
}
@@ -1531,7 +1552,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return updateCache(info, -1, false, entity, null, selects);
return this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, false, entity, null, selects);
}
@@ -1552,14 +1573,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, false, entity, null, selects);
}
});
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, false, entity, null, selects).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, false, entity, null, selects);
}
@@ -1578,7 +1599,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) return updateCache(info, -1, true, entity, node, selects);
return this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, true, entity, node, selects);
}
@@ -1599,14 +1620,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
if (isAsync()) return this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, true, entity, node, selects);
}
});
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, true, entity, node, selects).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
errorCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, true, entity, node, selects);
}