diff --git a/src/main/java/org/redkale/boot/LoggingFileHandler.java b/src/main/java/org/redkale/boot/LoggingFileHandler.java index 62232366d..3a895a7fa 100644 --- a/src/main/java/org/redkale/boot/LoggingFileHandler.java +++ b/src/main/java/org/redkale/boot/LoggingFileHandler.java @@ -72,14 +72,15 @@ public class LoggingFileHandler extends LoggingBaseHandler { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); final PrintStream ps = new PrintStream(out); - ps.println("handlers = java.util.logging.ConsoleHandler"); + final String handlerName = LoggingConsoleHandler.class.getName(); //java.util.logging.ConsoleHandler + ps.println("handlers = " + handlerName); ps.println(".level = FINEST"); ps.println("jdk.level = INFO"); ps.println("sun.level = INFO"); ps.println("com.sun.level = INFO"); ps.println("javax.level = INFO"); - ps.println("java.util.logging.ConsoleHandler.level = FINEST"); - ps.println("java.util.logging.ConsoleHandler.formatter = " + LoggingFormater.class.getName()); + ps.println(handlerName + ".level = FINEST"); + ps.println(handlerName + ".formatter = " + LoggingFormater.class.getName()); LogManager.getLogManager().readConfiguration(new ByteArrayInputStream(out.toByteArray())); } catch (Exception e) { } diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 00ff53354..7aaf65787 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -118,6 +118,233 @@ public class DataJdbcSource extends DataSqlSource { return CompletableFuture.supplyAsync(() -> batch(batch), getExecutor()); } + protected List createInsertPreparedStatements(final Connection conn, EntityInfo info, Map> prepareInfos, T... entitys) throws SQLException { + Attribute[] attrs = info.insertAttributes; + final List prestmts = new ArrayList<>(); + for (Map.Entry> en : prepareInfos.entrySet()) { + PrepareInfo prepareInfo = en.getValue(); + PreparedStatement prestmt = conn.prepareStatement(prepareInfo.prepareSql); + for (final T value : prepareInfo.entitys) { + batchStatementParameters(conn, prestmt, info, attrs, value); + prestmt.addBatch(); + } + prestmts.add(prestmt); + } + return prestmts; + } + + protected PreparedStatement createInsertPreparedStatement(Connection conn, String sql, EntityInfo info, T... entitys) throws SQLException { + Attribute[] attrs = info.insertAttributes; + final PreparedStatement prestmt = conn.prepareStatement(sql); + for (final T value : entitys) { + batchStatementParameters(conn, prestmt, info, attrs, value); + prestmt.addBatch(); + } + return prestmt; + } + + protected List createUpdatePreparedStatements(final Connection conn, EntityInfo info, Map> prepareInfos, T... entitys) throws SQLException { + Attribute primary = info.primary; + Attribute[] attrs = info.updateAttributes; + final List prestmts = new ArrayList<>(); + for (Map.Entry> en : prepareInfos.entrySet()) { + PrepareInfo prepareInfo = en.getValue(); + PreparedStatement prestmt = conn.prepareStatement(prepareInfo.prepareSql); + for (final T value : prepareInfo.entitys) { + int k = batchStatementParameters(conn, prestmt, info, attrs, value); + prestmt.setObject(++k, primary.get(value)); + prestmt.addBatch(); + } + prestmts.add(prestmt); + } + return prestmts; + } + + protected PreparedStatement createUpdatePreparedStatement(Connection conn, String sql, EntityInfo info, T... entitys) throws SQLException { + Attribute primary = info.primary; + Attribute[] attrs = info.updateAttributes; + final PreparedStatement prestmt = conn.prepareStatement(sql); + for (final T value : entitys) { + int k = batchStatementParameters(conn, prestmt, info, attrs, value); + prestmt.setObject(++k, primary.get(value)); + prestmt.addBatch(); + } + return prestmt; + } + + protected int batchStatementParameters(Connection conn, PreparedStatement prestmt, EntityInfo info, Attribute[] attrs, T entity) throws SQLException { + int i = 0; + for (Attribute attr : attrs) { + Object val = getEntityAttrValue(info, attr, entity); + if (val instanceof byte[]) { + Blob blob = conn.createBlob(); + blob.setBytes(1, (byte[]) val); + prestmt.setObject(++i, blob); + } else if (val instanceof Boolean) { + prestmt.setObject(++i, ((Boolean) val) ? (byte) 1 : (byte) 0); + } else if (val instanceof AtomicInteger) { + prestmt.setObject(++i, ((AtomicInteger) val).get()); + } else if (val instanceof AtomicLong) { + prestmt.setObject(++i, ((AtomicLong) val).get()); + } else { + prestmt.setObject(++i, val); + } + } + return i; + } + + @Override + protected CompletableFuture deleteDB(EntityInfo info, Flipper flipper, String... sqls) { + Connection conn = null; + final long s = System.currentTimeMillis(); + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(true); + int c = 0; + if (sqls.length == 1) { + String sql = sqls[0]; + sql += ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit())); + if (info.isLoggable(logger, Level.FINEST, sql)) { + logger.finest(info.getType().getSimpleName() + " delete sql=" + sql); + } + final Statement stmt = conn.createStatement(); + c = stmt.executeUpdate(sql); + stmt.close(); + } else { + if (flipper == null || flipper.getLimit() < 1) { + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " delete sqls=" + Arrays.toString(sqls)); + } + final Statement stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql); + } + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c += cc; + } + } else { + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " limit " + flipper.getLimit() + " delete sqls=" + Arrays.toString(sqls)); + } + final Statement stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql + " LIMIT " + flipper.getLimit()); + } + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c += cc; + } + } + } + slowLog(s, sqls); + return CompletableFuture.completedFuture(c); + } catch (SQLException e) { + if (isTableNotExist(info, e.getSQLState())) { + if (info.getTableStrategy() == null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { + try { + Statement st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); + } + st.executeBatch(); + } + st.close(); + return CompletableFuture.completedFuture(0); + } catch (SQLException e2) { + return CompletableFuture.failedFuture(e2); + } + } + } else { + return CompletableFuture.failedFuture(e); + } + } + return CompletableFuture.failedFuture(e); + } finally { + if (conn != null) writePool.offerConnection(conn); + } + } + + @Override + protected CompletableFuture clearTableDB(EntityInfo info, final String[] tables, String... sqls) { + Connection conn = null; + final long s = System.currentTimeMillis(); + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(true); + int c = 0; + final Statement stmt = conn.createStatement(); + if (sqls.length == 1) { + String sql = sqls[0]; + c = stmt.executeUpdate(sql); + } else { + for (String sql : sqls) { + stmt.addBatch(sql); + } + int[] cs = stmt.executeBatch(); + for (int cc : cs) { + c += cc; + } + } + stmt.close(); + slowLog(s, sqls); + return CompletableFuture.completedFuture(c); + } catch (SQLException e) { + if (isTableNotExist(info, e.getSQLState())) return CompletableFuture.completedFuture(-1); + return CompletableFuture.failedFuture(e); + } finally { + if (conn != null) writePool.offerConnection(conn); + } + } + + @Override + protected CompletableFuture dropTableDB(EntityInfo info, String[] tables, String... sqls) { + Connection conn = null; + final long s = System.currentTimeMillis(); + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(true); + int c = 0; + final Statement stmt = conn.createStatement(); + if (sqls.length == 1) { + String sql = sqls[0]; + c = stmt.executeUpdate(sql); + } else { + for (String sql : sqls) { + stmt.addBatch(sql); + } + int[] cs = stmt.executeBatch(); + for (int cc : cs) { + c += cc; + } + } + stmt.close(); + if (info.getTableStrategy() != null) { + for (String table : tables) { + String tablekey = table.indexOf('.') > 0 ? table : (conn.getCatalog() + '.' + table); + info.removeDisTable(tablekey); + } + } + slowLog(s, sqls); + return CompletableFuture.completedFuture(c); + } catch (SQLException e) { + if (isTableNotExist(info, e.getSQLState())) return CompletableFuture.completedFuture(-1); + return CompletableFuture.failedFuture(e); + } finally { + if (conn != null) writePool.offerConnection(conn); + } + } + @Override public int batch(final DataBatch batch) { Objects.requireNonNull(batch); @@ -159,6 +386,7 @@ public class DataJdbcSource extends DataSqlSource { c1 += cc; } c = c1; + prestmt.close(); } else { int c1 = 0; for (PreparedStatement stmt : prestmts) { @@ -168,6 +396,9 @@ public class DataJdbcSource extends DataSqlSource { } } c = c1; + for (PreparedStatement stmt : prestmts) { + stmt.close(); + } } conn.commit(); } catch (SQLException se) { @@ -253,13 +484,13 @@ public class DataJdbcSource extends DataSqlSource { st.close(); } catch (SQLException sqle2) { if (isTableNotExist(info, sqle2.getSQLState())) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { //创建原始表 + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { //创建原始表 st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tableSql : tablesqls) { + for (String tableSql : tableSqls) { st.addBatch(tableSql); } st.executeBatch(); @@ -370,207 +601,12 @@ public class DataJdbcSource extends DataSqlSource { } return CompletableFuture.completedFuture(c); } catch (SQLException e) { - try { - if (conn != null) conn.rollback(); - } catch (SQLException se) { - } - return CompletableFuture.failedFuture(e); - } finally { - if (conn != null) writePool.offerConnection(conn); - } - } - - protected List createInsertPreparedStatements(final Connection conn, EntityInfo info, Map> prepareInfos, T... entitys) throws SQLException { - Attribute[] attrs = info.insertAttributes; - final List prestmts = new ArrayList<>(); - for (Map.Entry> en : prepareInfos.entrySet()) { - PrepareInfo prepareInfo = en.getValue(); - PreparedStatement prestmt = conn.prepareStatement(prepareInfo.prepareSql); - for (final T value : prepareInfo.entitys) { - batchStatementParameters(conn, prestmt, info, attrs, value); - prestmt.addBatch(); - } - prestmts.add(prestmt); - } - return prestmts; - } - - protected PreparedStatement createInsertPreparedStatement(Connection conn, String sql, EntityInfo info, T... entitys) throws SQLException { - Attribute[] attrs = info.insertAttributes; - final PreparedStatement prestmt = conn.prepareStatement(sql); - - for (final T value : entitys) { - batchStatementParameters(conn, prestmt, info, attrs, value); - prestmt.addBatch(); - } - return prestmt; - } - - protected int batchStatementParameters(Connection conn, PreparedStatement prestmt, EntityInfo info, Attribute[] attrs, T entity) throws SQLException { - int i = 0; - for (Attribute attr : attrs) { - Object val = getEntityAttrValue(info, attr, entity); - if (val instanceof byte[]) { - Blob blob = conn.createBlob(); - blob.setBytes(1, (byte[]) val); - prestmt.setObject(++i, blob); - } else if (val instanceof Boolean) { - prestmt.setObject(++i, ((Boolean) val) ? (byte) 1 : (byte) 0); - } else if (val instanceof AtomicInteger) { - prestmt.setObject(++i, ((AtomicInteger) val).get()); - } else if (val instanceof AtomicLong) { - prestmt.setObject(++i, ((AtomicLong) val).get()); - } else { - prestmt.setObject(++i, val); - } - } - return i; - } - - @Override - protected CompletableFuture deleteDB(EntityInfo info, Flipper flipper, String... sqls) { - Connection conn = null; - final long s = System.currentTimeMillis(); - try { - conn = writePool.pollConnection(); - conn.setReadOnly(false); - conn.setAutoCommit(true); - int c = 0; - if (sqls.length == 1) { - String sql = sqls[0]; - sql += ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit())); - if (info.isLoggable(logger, Level.FINEST, sql)) { - logger.finest(info.getType().getSimpleName() + " delete sql=" + sql); - } - final Statement stmt = conn.createStatement(); - c = stmt.executeUpdate(sql); - stmt.close(); - } else { - if (flipper == null || flipper.getLimit() < 1) { - if (info.isLoggable(logger, Level.FINEST, sqls[0])) { - logger.finest(info.getType().getSimpleName() + " delete sqls=" + Arrays.toString(sqls)); - } - final Statement stmt = conn.createStatement(); - for (String sql : sqls) { - stmt.addBatch(sql); - } - int[] cs = stmt.executeBatch(); - stmt.close(); - for (int cc : cs) { - c += cc; - } - } else { - if (info.isLoggable(logger, Level.FINEST, sqls[0])) { - logger.finest(info.getType().getSimpleName() + " limit " + flipper.getLimit() + " delete sqls=" + Arrays.toString(sqls)); - } - final Statement stmt = conn.createStatement(); - for (String sql : sqls) { - stmt.addBatch(sql + " LIMIT " + flipper.getLimit()); - } - int[] cs = stmt.executeBatch(); - stmt.close(); - for (int cc : cs) { - c += cc; - } + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException se) { } } - slowLog(s, sqls); - return CompletableFuture.completedFuture(c); - } catch (SQLException e) { - if (isTableNotExist(info, e.getSQLState())) { - if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { - try { - Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); - } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); - } - st.executeBatch(); - } - st.close(); - return CompletableFuture.completedFuture(0); - } catch (SQLException e2) { - return CompletableFuture.failedFuture(e2); - } - } - } - } - return CompletableFuture.failedFuture(e); - } finally { - if (conn != null) writePool.offerConnection(conn); - } - } - - @Override - protected CompletableFuture clearTableDB(EntityInfo info, final String[] tables, String... sqls) { - Connection conn = null; - final long s = System.currentTimeMillis(); - try { - conn = writePool.pollConnection(); - conn.setReadOnly(false); - conn.setAutoCommit(true); - int c = 0; - final Statement stmt = conn.createStatement(); - if (sqls.length == 1) { - String sql = sqls[0]; - c = stmt.executeUpdate(sql); - } else { - for (String sql : sqls) { - stmt.addBatch(sql); - } - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c += cc; - } - } - stmt.close(); - slowLog(s, sqls); - return CompletableFuture.completedFuture(c); - } catch (SQLException e) { - if (isTableNotExist(info, e.getSQLState())) return CompletableFuture.completedFuture(-1); - return CompletableFuture.failedFuture(e); - } finally { - if (conn != null) writePool.offerConnection(conn); - } - } - - @Override - protected CompletableFuture dropTableDB(EntityInfo info, String[] tables, String... sqls) { - Connection conn = null; - final long s = System.currentTimeMillis(); - try { - conn = writePool.pollConnection(); - conn.setReadOnly(false); - conn.setAutoCommit(true); - int c = 0; - final Statement stmt = conn.createStatement(); - if (sqls.length == 1) { - String sql = sqls[0]; - c = stmt.executeUpdate(sql); - } else { - for (String sql : sqls) { - stmt.addBatch(sql); - } - int[] cs = stmt.executeBatch(); - for (int cc : cs) { - c += cc; - } - } - stmt.close(); - if (info.getTableStrategy() != null) { - for (String table : tables) { - String tablekey = table.indexOf('.') > 0 ? table : (conn.getCatalog() + '.' + table); - info.removeDisTable(tablekey); - } - } - slowLog(s, sqls); - return CompletableFuture.completedFuture(c); - } catch (SQLException e) { - if (isTableNotExist(info, e.getSQLState())) return CompletableFuture.completedFuture(-1); return CompletableFuture.failedFuture(e); } finally { if (conn != null) writePool.offerConnection(conn); @@ -581,69 +617,173 @@ public class DataJdbcSource extends DataSqlSource { protected CompletableFuture updateEntityDB(EntityInfo info, T... entitys) { Connection conn = null; final long s = System.currentTimeMillis(); + String presql = null; + PreparedStatement prestmt = null; + + List prestmts = null; + Map> prepareInfos = null; + try { conn = writePool.pollConnection(); conn.setReadOnly(false); - conn.setAutoCommit(true); - final String updateSQL = info.getUpdateQuestionPrepareSQL(entitys[0]); - final PreparedStatement prestmt = conn.prepareStatement(updateSQL); - Attribute[] attrs = info.updateAttributes; - final boolean debugfinest = info.isLoggable(logger, Level.FINEST); - char[] sqlchars = debugfinest ? updateSQL.toCharArray() : null; - final Attribute primary = info.getPrimary(); - for (final T value : entitys) { - int k = batchStatementParameters(conn, prestmt, info, attrs, value); - prestmt.setObject(++k, primary.get(value)); - prestmt.addBatch();//------------------------------------------------------------ - if (debugfinest) { //打印调试信息 - //----------------------------- - int i = 0; - StringBuilder sb = new StringBuilder(128); - for (char ch : sqlchars) { - if (ch == '?') { - Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value); - if (obj != null && obj.getClass().isArray()) { - sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'"); - } else { - sb.append(info.formatSQLValue(obj, sqlFormatter)); + conn.setAutoCommit(false); + + int c = 0; + final Attribute[] attrs = info.updateAttributes; + AGAIN: + while (true) { + try { + if (info.getTableStrategy() == null) { + presql = info.getUpdateQuestionPrepareSQL(entitys[0]); + prestmt = createUpdatePreparedStatement(conn, presql, info, entitys); + int c1 = 0; + int[] pc = prestmt.executeBatch(); + for (int p : pc) { + if (p >= 0) c1 += p; + } + c = c1; + prestmt.close(); + } else { + if (prepareInfos == null) { + prepareInfos = getUpdateQuestionPrepareInfo(info, entitys); + } + prestmts = createUpdatePreparedStatements(conn, info, prepareInfos, entitys); + int c1 = 0; + for (PreparedStatement stmt : prestmts) { + int[] cs = stmt.executeBatch(); + for (int cc : cs) { + c1 += cc; } - } else { - sb.append(ch); + } + c = c1; + for (PreparedStatement stmt : prestmts) { + stmt.close(); } } - String debugsql = sb.toString(); - if (info.isLoggable(logger, Level.FINEST, debugsql)) logger.finest(info.getType().getSimpleName() + " updates sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); - } //打印结束 + conn.commit(); + break; + } catch (SQLException se) { + conn.rollback(); + if (isTableNotExist(info, se.getSQLState())) { + if (info.getTableStrategy() == null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { + try { + Statement st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); + } + st.executeBatch(); + } + st.close(); + } catch (SQLException e2) { + } + } + //表都不存在,更新条数为0 + return CompletableFuture.completedFuture(0); + } else { + String tableName = parseNotExistTableName(se); + if (tableName == null || prepareInfos == null) { + return CompletableFuture.failedFuture(se); + } + String minTableName = (tableName.indexOf('.') > 0) ? tableName.substring(tableName.indexOf('.') + 1) : null; + for (String t : prepareInfos.keySet()) { + if (t.equals(tableName)) { + prepareInfos.remove(t); + if (info.getTableStrategy() == null) { + prestmt.close(); + } else { + for (PreparedStatement stmt : prestmts) { + stmt.close(); + } + } + continue AGAIN; + } else if (minTableName != null && t.equals(minTableName)) { + prepareInfos.remove(t); + if (info.getTableStrategy() == null) { + prestmt.close(); + } else { + for (PreparedStatement stmt : prestmts) { + stmt.close(); + } + } + continue AGAIN; + } + } + return CompletableFuture.failedFuture(se); + } + } + throw se; + } } - int[] pc = prestmt.executeBatch(); - int c = 0; - for (int p : pc) { - if (p >= 0) c += p; + + if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息 + Attribute primary = info.getPrimary(); + if (info.getTableStrategy() == null) { + char[] sqlchars = presql.toCharArray(); + for (final T value : entitys) { + //----------------------------- + StringBuilder sb = new StringBuilder(128); + int i = 0; + for (char ch : sqlchars) { + if (ch == '?') { + Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value); + if (obj != null && obj.getClass().isArray()) { + sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'"); + } else { + sb.append(info.formatSQLValue(obj, sqlFormatter)); + } + } else { + sb.append(ch); + } + } + String debugsql = sb.toString(); + if (info.isLoggable(logger, Level.FINEST, debugsql)) logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); + } + } else { + prepareInfos.forEach((t, p) -> { + char[] sqlchars = p.prepareSql.toCharArray(); + for (final T value : p.entitys) { + //----------------------------- + StringBuilder sb = new StringBuilder(128); + int i = 0; + for (char ch : sqlchars) { + if (ch == '?') { + Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value); + if (obj != null && obj.getClass().isArray()) { + sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'"); + } else { + sb.append(info.formatSQLValue(obj, sqlFormatter)); + } + } else { + sb.append(ch); + } + } + String debugsql = sb.toString(); + if (info.isLoggable(logger, Level.FINEST, debugsql)) logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n")); + } + }); + } + } //打印结束 + if (info.getTableStrategy() == null) { + slowLog(s, presql); + } else { + List presqls = new ArrayList<>(); + prepareInfos.forEach((t, p) -> { + presqls.add(p.prepareSql); + }); + slowLog(s, presqls.toArray(new String[presqls.size()])); } - prestmt.close(); - slowLog(s, updateSQL); return CompletableFuture.completedFuture(c); } catch (SQLException e) { - if (isTableNotExist(info, e.getSQLState())) { - if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { - try { - Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); - } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); - } - st.executeBatch(); - } - st.close(); - } catch (SQLException e2) { - } - } + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException se) { } - return CompletableFuture.completedFuture(0); } return CompletableFuture.failedFuture(e); } finally { @@ -702,15 +842,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -718,8 +858,10 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e2) { } } + return CompletableFuture.completedFuture(0); + } else { + return CompletableFuture.failedFuture(e); } - return CompletableFuture.completedFuture(0); } return CompletableFuture.failedFuture(e); } finally { @@ -755,15 +897,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -801,15 +943,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -848,15 +990,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -910,15 +1052,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -953,15 +1095,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -1000,15 +1142,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -1043,15 +1185,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -1180,15 +1322,15 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { if (isTableNotExist(info, e.getSQLState())) { if (info.getTableStrategy() == null) { - String[] tablesqls = createTableSqls(info); - if (tablesqls != null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { try { Statement st = conn.createStatement(); - if (tablesqls.length == 1) { - st.execute(tablesqls[0]); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); } else { - for (String tablesql : tablesqls) { - st.addBatch(tablesql); + for (String tableSql : tableSqls) { + st.addBatch(tableSql); } st.executeBatch(); } @@ -1196,8 +1338,10 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e2) { } } + return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList())); + } else { + return CompletableFuture.failedFuture(e); } - return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList())); } return CompletableFuture.failedFuture(e); } finally { diff --git a/src/main/java/org/redkale/source/DataSqlSource.java b/src/main/java/org/redkale/source/DataSqlSource.java index 02b586fa4..ff11d3504 100644 --- a/src/main/java/org/redkale/source/DataSqlSource.java +++ b/src/main/java/org/redkale/source/DataSqlSource.java @@ -7,6 +7,7 @@ package org.redkale.source; import java.io.Serializable; import java.math.*; +import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -64,8 +65,8 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi protected BiFunction sqlFormatter; - protected BiConsumer futureCompleteConsumer = (r, t) -> { - if (t != null) logger.log(Level.INFO, "CompletableFuture complete error", (Throwable) t); + protected BiConsumer errorCompleteConsumer = (r, t) -> { + //if (t != null) logger.log(Level.INFO, "CompletableFuture complete error", (Throwable) t); }; protected final BiFunction> fullloader = (s, i) @@ -229,6 +230,24 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } } + protected String parseNotExistTableName(SQLException e) { + String errmsg = e.getMessage(); + char quote = '"'; + String tableName = null; + int pos = errmsg.indexOf(quote); + if (pos < 0) { + quote = '\''; + pos = errmsg.indexOf(quote); + } + if (pos >= 0) { + int pos2 = errmsg.indexOf(quote, pos + 1); + if (pos2 > pos) { + tableName = errmsg.substring(pos + 1, pos2); + } + } + return tableName; + } + //解密可能存在的加密字段, 可重载 protected String decryptProperty(String key, String value) { return value; @@ -725,7 +744,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return insertCache(info, entitys); return insertDB(info, entitys).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { insertCache(info, entitys); } @@ -743,14 +762,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return insertDB(info, entitys).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { insertCache(info, entitys); } }); return CompletableFuture.supplyAsync(() -> insertDB(info, entitys).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { insertCache(info, entitys); } @@ -814,7 +833,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return deleteCache(info, -1, pks); return deleteCompose(info, pks).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { deleteCache(info, rs, pks); } @@ -830,14 +849,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return deleteCompose(info, pks).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { deleteCache(info, rs, pks); } }); return CompletableFuture.supplyAsync(() -> deleteCompose(info, pks).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { deleteCache(info, rs, pks); } @@ -860,7 +879,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return deleteCache(info, -1, flipper, node); return this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { deleteCache(info, rs, flipper, node); } @@ -875,14 +894,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { deleteCache(info, rs, flipper, node); } }); return CompletableFuture.supplyAsync(() -> this.deleteCompose(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { deleteCache(info, rs, flipper, node); } @@ -968,7 +987,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return clearTableCache(info, node); return this.clearTableCompose(info, node).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { clearTableCache(info, node); } @@ -983,14 +1002,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return this.clearTableCompose(info, node).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { clearTableCache(info, node); } }); return CompletableFuture.supplyAsync(() -> this.clearTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { clearTableCache(info, node); } @@ -1024,7 +1043,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return dropTableCache(info, node); return this.dropTableCompose(info, node).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { dropTableCache(info, node); } @@ -1039,14 +1058,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return this.dropTableCompose(info, node).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { dropTableCache(info, node); } }); return CompletableFuture.supplyAsync(() -> this.dropTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { dropTableCache(info, node); } @@ -1129,10 +1148,12 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi checkEntity("update", false, entitys); final Class clazz = (Class) entitys[0].getClass(); final EntityInfo info = loadEntityInfo(clazz); - if (isOnlyCache(info)) return updateCache(info, -1, entitys); + if (isOnlyCache(info)) { + return updateCache(info, -1, entitys); + } return updateEntityDB(info, entitys).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, entitys); } @@ -1151,14 +1172,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return updateEntityDB(info, entitys).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, entitys); } }); return CompletableFuture.supplyAsync(() -> updateEntityDB(info, entitys).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, entitys); } @@ -1182,7 +1203,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return updateCache(info, -1, pk, column, colval); return updateColumnCompose(info, pk, column, colval).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, pk, column, colval); } @@ -1197,14 +1218,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return updateColumnCompose(info, pk, column, colval).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, pk, column, colval); } }); return CompletableFuture.supplyAsync(() -> updateColumnCompose(info, pk, column, colval).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, pk, column, colval); } @@ -1245,7 +1266,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return updateCache(info, -1, column, colval, node); return this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, column, colval, node); } @@ -1260,14 +1281,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, column, colval, node); } }); return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, column, colval, node).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, column, colval, node); } @@ -1343,7 +1364,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return updateCache(info, -1, pk, values); return this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, pk, values); } @@ -1359,14 +1380,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, pk, values); } }); return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, pk, values).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, pk, values); } @@ -1408,7 +1429,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return updateCache(info, -1, node, flipper, values); return this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, node, flipper, values); } @@ -1424,14 +1445,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, node, flipper, values); } }); return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, node, flipper, values); } @@ -1531,7 +1552,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return updateCache(info, -1, false, entity, null, selects); return this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, false, entity, null, selects); } @@ -1552,14 +1573,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, false, entity, null, selects); } }); return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, false, entity, null, selects).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, false, entity, null, selects); } @@ -1578,7 +1599,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi if (isOnlyCache(info)) return updateCache(info, -1, true, entity, node, selects); return this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, true, entity, node, selects); } @@ -1599,14 +1620,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } if (isAsync()) return this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, true, entity, node, selects); } }); return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, true, entity, node, selects).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { - futureCompleteConsumer.accept(rs, t); + errorCompleteConsumer.accept(rs, t); } else { updateCache(info, rs, true, entity, node, selects); }