/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.source;
import java.io.Serializable;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.stream.Stream;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceListener;
import org.redkale.annotation.ResourceType;
import org.redkale.service.Local;
import org.redkale.util.*;
/**
* DataSource的JDBC实现类
*
*
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@Local
@AutoLoad(false)
@SuppressWarnings("unchecked")
@ResourceType(DataSource.class)
public class DataJdbcSource extends AbstractDataSqlSource {
protected ConnectionPool readPool;
protected ConnectionPool writePool;
public DataJdbcSource() {
super();
}
@Override
public void init(AnyValue conf) {
super.init(conf);
this.readPool = new ConnectionPool(readConfProps);
if (readConfProps == writeConfProps) {
this.writePool = readPool;
} else {
this.writePool = new ConnectionPool(writeConfProps);
}
}
@Override
protected void updateOneResourceChange(Properties newProps, ResourceEvent[] events) {
this.readPool.onResourceChange(events);
}
@Override
protected void updateReadResourceChange(Properties newReadProps, ResourceEvent[] events) {
this.readPool.onResourceChange(events);
}
@Override
protected void updateWriteResourceChange(Properties newWriteProps, ResourceEvent[] events) {
this.writePool.onResourceChange(events);
}
@Override
protected int readMaxConns() {
return this.readPool.maxConns;
}
@Override
protected int writeMaxConns() {
return this.writePool.maxConns;
}
@Override
public void destroy(AnyValue config) {
if (readPool != null) {
readPool.close();
}
if (writePool != null && writePool != readPool) {
writePool.close();
}
}
@Override
public void close() throws Exception {
super.close();
if (readPool != null) {
readPool.close();
}
if (writePool != null && writePool != readPool) {
writePool.close();
}
}
public static boolean acceptsConf(AnyValue conf) {
try {
AnyValue read = conf.getAnyValue("read");
AnyValue node = read == null ? conf : read;
final Class driverClass = DriverManager.getDriver(node.getValue(DATA_SOURCE_URL)).getClass();
RedkaleClassLoader.putReflectionDeclaredConstructors(driverClass, driverClass.getName());
RedkaleClassLoader.putServiceLoader(java.sql.Driver.class);
} catch (Exception e) {
return false;
}
return true;
}
protected ConnectionPool readPool() {
return readPool;
}
protected ConnectionPool writePool() {
return writePool;
}
@Override
protected final String prepareParamSign(int index) {
return "?";
}
@Override
protected final boolean isAsync() {
return false;
}
protected List prepareInsertEntityStatements(SourceConnection conn, EntityInfo info, Map> prepareInfos, T... entitys) throws SQLException {
Attribute[] attrs = info.insertAttributes;
final List prestmts = new ArrayList<>();
for (Map.Entry> en : prepareInfos.entrySet()) {
PrepareInfo prepareInfo = en.getValue();
PreparedStatement prestmt = conn.prepareUpdateStatement(prepareInfo.prepareSql);
for (final T value : prepareInfo.entitys) {
bindStatementParameters(conn, prestmt, info, attrs, value);
prestmt.addBatch();
}
prestmts.add(prestmt);
}
return prestmts;
}
protected PreparedStatement prepareInsertEntityStatement(SourceConnection conn, String sql, EntityInfo info, T... entitys) throws SQLException {
Attribute[] attrs = info.insertAttributes;
final PreparedStatement prestmt = conn.prepareUpdateStatement(sql);
for (final T value : entitys) {
bindStatementParameters(conn, prestmt, info, attrs, value);
prestmt.addBatch();
}
return prestmt;
}
protected List prepareUpdateEntityStatements(SourceConnection conn, EntityInfo info, Map> prepareInfos, T... entitys) throws SQLException {
Attribute primary = info.primary;
Attribute[] attrs = info.updateAttributes;
final List prestmts = new ArrayList<>();
for (Map.Entry> en : prepareInfos.entrySet()) {
PrepareInfo prepareInfo = en.getValue();
PreparedStatement prestmt = conn.prepareUpdateStatement(prepareInfo.prepareSql);
for (final T value : prepareInfo.entitys) {
int k = bindStatementParameters(conn, prestmt, info, attrs, value);
prestmt.setObject(++k, primary.get(value));
prestmt.addBatch();
}
prestmts.add(prestmt);
}
return prestmts;
}
protected PreparedStatement prepareUpdateEntityStatement(SourceConnection conn, String prepareSQL, EntityInfo info, T... entitys) throws SQLException {
Attribute primary = info.primary;
Attribute[] attrs = info.updateAttributes;
final PreparedStatement prestmt = conn.prepareUpdateStatement(prepareSQL);
for (final T value : entitys) {
int k = bindStatementParameters(conn, prestmt, info, attrs, value);
prestmt.setObject(++k, primary.get(value));
prestmt.addBatch();
}
return prestmt;
}
protected int bindStatementParameters(SourceConnection conn, PreparedStatement prestmt, EntityInfo info, Attribute[] attrs, T entity) throws SQLException {
int i = 0;
for (Attribute attr : attrs) {
Object val = getEntityAttrValue(info, attr, entity);
if (val instanceof byte[]) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++i, blob);
} else if (val instanceof Boolean) {
prestmt.setObject(++i, ((Boolean) val) ? (byte) 1 : (byte) 0);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++i, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++i, ((AtomicLong) val).get());
} else {
prestmt.setObject(++i, val);
}
}
return i;
}
@Override
public int batch(final DataBatch batch) {
Objects.requireNonNull(batch);
final DefaultDataBatch dataBatch = (DefaultDataBatch) batch;
if (dataBatch.actions.isEmpty()) {
return 0;
}
int c = 0;
SourceConnection conn = null;
try {
conn = writePool.pollTransConnection();
conn.setAutoCommit(false);
for (BatchAction action : dataBatch.actions) {
if (action instanceof RunnableBatchAction) {
RunnableBatchAction act = (RunnableBatchAction) action;
act.task.run();
} else if (action instanceof InsertBatchAction1) {
InsertBatchAction1 act = (InsertBatchAction1) action;
EntityInfo info = apply(act.entity.getClass());
c += insertDBStatement(true, conn, info, act.entity);
} else if (action instanceof DeleteBatchAction1) {
DeleteBatchAction1 act = (DeleteBatchAction1) action;
EntityInfo info = apply(act.entity.getClass());
Serializable pk = info.getPrimaryValue(act.entity);
Map> pkmap = info.getTableMap(pk);
String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]);
String[] sqls = deleteSql(info, pkmap);
c += deleteDBStatement(true, conn, info, tables, null, null, pkmap, sqls);
} else if (action instanceof DeleteBatchAction2) {
DeleteBatchAction2 act = (DeleteBatchAction2) action;
EntityInfo info = apply(act.clazz);
Map> pkmap = info.getTableMap(act.pk);
String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]);
String[] sqls = deleteSql(info, pkmap);
c += deleteDBStatement(true, conn, info, tables, null, null, pkmap, sqls);
} else if (action instanceof DeleteBatchAction3) {
DeleteBatchAction3 act = (DeleteBatchAction3) action;
EntityInfo info = apply(act.clazz);
String[] tables = info.getTables(act.node);
String[] sqls = deleteSql(info, tables, act.flipper, act.node);
c += deleteDBStatement(true, conn, info, tables, act.flipper, act.node, null, sqls);
} else if (action instanceof UpdateBatchAction1) {
UpdateBatchAction1 act = (UpdateBatchAction1) action;
EntityInfo info = apply(act.entity.getClass());
c += updateEntityDBStatement(true, conn, info, act.entity);
} else if (action instanceof UpdateBatchAction2) {
UpdateBatchAction2 act = (UpdateBatchAction2) action;
EntityInfo info = apply(act.clazz);
UpdateSqlInfo sql = updateColumnSql(info, act.pk, act.values);
c += updateColumnDBStatement(true, conn, info, null, sql);
} else if (action instanceof UpdateBatchAction3) {
UpdateBatchAction3 act = (UpdateBatchAction3) action;
EntityInfo info = apply(act.clazz);
UpdateSqlInfo sql = updateColumnSql(info, act.node, act.flipper, act.values);
c += updateColumnDBStatement(true, conn, info, act.flipper, sql);
} else if (action instanceof UpdateBatchAction4) {
UpdateBatchAction4 act = (UpdateBatchAction4) action;
EntityInfo info = apply(act.entity.getClass());
UpdateSqlInfo sql = updateColumnSql(info, false, act.entity, act.node, act.selects);
c += updateColumnDBStatement(true, conn, info, null, sql);
}
}
conn.commit();
return c;
} catch (SourceException se) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException sqe) {
}
}
throw se;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerTransConnection(conn);
}
}
}
@Override
public CompletableFuture batchAsync(final DataBatch batch) {
return supplyAsync(() -> batch(batch));
}
@Override
protected CompletableFuture insertDBAsync(EntityInfo info, T... entitys) {
return supplyAsync(() -> insertDB(info, entitys));
}
@Override
protected int insertDB(EntityInfo info, T... entitys) {
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
int c = insertDBStatement(false, conn, info, entitys);
conn.commit();
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
private int insertDBStatement(final boolean batch, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException {
final long s = System.currentTimeMillis();
int c = 0;
String presql = null;
PreparedStatement prestmt = null;
List prestmts = null;
Map> prepareInfos = null;
Attribute[] attrs = info.insertAttributes;
if (info.getTableStrategy() == null) { //单库单表
presql = info.getInsertQuestionPrepareSQL(entitys[0]);
prestmt = prepareInsertEntityStatement(conn, presql, info, entitys);
} else { //分库分表
prepareInfos = getInsertQuestionPrepareInfo(info, entitys);
prestmts = prepareInsertEntityStatements(conn, info, prepareInfos, entitys);
}
try {
if (info.getTableStrategy() == null) { //单库单表
c = Utility.sum(prestmt.executeBatch());
conn.offerUpdateStatement(prestmt);
} else { //分库分表
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
c1 += Utility.sum(stmt.executeBatch());
}
c = c1;
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
}
if (!batch) {
conn.commit();
}
} catch (SQLException se) {
if (!batch) {
conn.rollback();
}
if (!isTableNotExist(info, se.getSQLState())) {
throw se;
}
if (info.getTableStrategy() == null) { //单库单表
String[] tableSqls = createTableSqls(info);
if (tableSqls == null) {
throw se;
}
//创建单表结构
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
conn.offerUpdateStatement(stmt);
} else { //分库分表
info.disTableLock().lock();
try {
final Set newCatalogs = new LinkedHashSet<>();
final List tableCopys = new ArrayList<>();
prepareInfos.forEach((t, p) -> {
int pos = t.indexOf('.');
if (pos > 0) {
newCatalogs.add(t.substring(0, pos));
}
tableCopys.add(getTableCopySQL(info, t));
});
try {
//执行一遍创建分表操作
Statement stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
stmt.addBatch(copySql);
}
stmt.executeBatch();
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle) { //多进程并发时可能会出现重复建表
if (isTableNotExist(info, sqle.getSQLState())) {
if (newCatalogs.isEmpty()) { //分表的原始表不存在
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
//创建原始表
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
conn.offerUpdateStatement(stmt);
//再执行一遍创建分表操作
stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
stmt.addBatch(copySql);
}
stmt.executeBatch();
conn.offerUpdateStatement(stmt);
}
} else { //需要先建库
Statement stmt;
try {
stmt = conn.createUpdateStatement();
for (String newCatalog : newCatalogs) {
stmt.addBatch(("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog);
}
stmt.executeBatch();
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle1) {
logger.log(Level.SEVERE, "create database " + tableCopys + " error", sqle1);
}
try {
//再执行一遍创建分表操作
stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
stmt.addBatch(copySql);
}
stmt.executeBatch();
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle2) {
if (isTableNotExist(info, sqle2.getSQLState())) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) { //创建原始表
stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
conn.offerUpdateStatement(stmt);
//再执行一遍创建分表操作
stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
stmt.addBatch(copySql);
}
stmt.executeBatch();
conn.offerUpdateStatement(stmt);
}
} else {
logger.log(Level.SEVERE, "create table2 " + tableCopys + " error", sqle2);
}
}
}
}
}
} finally {
info.disTableLock().unlock();
}
}
if (info.getTableStrategy() == null) { //单库单表
conn.offerUpdateStatement(prestmt);
prestmt = prepareInsertEntityStatement(conn, presql, info, entitys);
c = Utility.sum(prestmt.executeBatch());
conn.offerUpdateStatement(prestmt);
} else { //分库分表
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
prestmts = prepareInsertEntityStatements(conn, info, prepareInfos, entitys);
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
c1 += Utility.sum(stmt.executeBatch());
}
c = c1;
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
}
}
//------------------------------------------------------------
if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息
if (info.getTableStrategy() == null) {
char[] sqlchars = presql.toCharArray();
for (final T value : entitys) {
//-----------------------------
StringBuilder sb = new StringBuilder(128);
int i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = info.getSQLValue(attrs[i++], value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(info.formatSQLValue(obj, sqlFormatter));
}
} else {
sb.append(ch);
}
}
String debugsql = sb.toString();
if (info.isLoggable(logger, Level.FINEST, debugsql)) {
logger.finest(info.getType().getSimpleName() + " insert sql=" + debugsql.replaceAll("(\r|\n)", "\\n"));
}
}
} else {
prepareInfos.forEach((t, p) -> {
char[] sqlchars = p.prepareSql.toCharArray();
for (final T value : p.entitys) {
//-----------------------------
StringBuilder sb = new StringBuilder(128);
int i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = info.getSQLValue(attrs[i++], value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(info.formatSQLValue(obj, sqlFormatter));
}
} else {
sb.append(ch);
}
}
String debugsql = sb.toString();
if (info.isLoggable(logger, Level.FINEST, debugsql)) {
logger.finest(info.getType().getSimpleName() + " insert sql=" + debugsql.replaceAll("(\r|\n)", "\\n"));
}
}
});
}
} //打印结束
if (info.getTableStrategy() == null) {
slowLog(s, presql);
} else {
List presqls = new ArrayList<>();
prepareInfos.forEach((t, p) -> {
presqls.add(p.prepareSql);
});
slowLog(s, presqls.toArray(new String[presqls.size()]));
}
return c;
}
@Override
protected CompletableFuture deleteDBAsync(final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, final String... sqls) {
return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls));
}
@Override
protected int deleteDB(EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) {
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
int c = deleteDBStatement(false, conn, info, tables, flipper, node, pkmap, sqls);
conn.commit();
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
private int deleteDBStatement(final boolean batch, final SourceConnection conn, final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) throws SQLException {
final long s = System.currentTimeMillis();
try {
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(sqls[0]);
conn.offerUpdateStatement(stmt);
} else {
final Statement stmt = conn.createUpdateStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
c = Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
}
if (!batch) {
conn.commit();
}
slowLog(s, sqls);
return c;
} catch (SQLException e) {
if (!batch) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
conn.offerUpdateStatement(stmt);
return 0;
}
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw e;
// }
String[] oldTables = tables;
List notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw e;
}
for (String t : notExistTables) {
if (pkmap != null) {
pkmap.remove(t);
} else {
tables = Utility.remove(tables, t);
}
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "delete, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + (pkmap != null ? pkmap.keySet() : Arrays.toString(tables)));
}
if ((pkmap != null ? pkmap.size() : tables.length) == 0) { //分表全部不存在
return 0;
}
sqls = pkmap != null ? deleteSql(info, pkmap) : deleteSql(info, tables, flipper, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createUpdateStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw se;
}
} else {
throw e;
}
}
throw e;
}
}
@Override
protected CompletableFuture clearTableDBAsync(EntityInfo info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> clearTableDB(info, tables, node, sqls));
}
@Override
protected int clearTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) {
SourceConnection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(sqls[0]);
conn.offerUpdateStatement(stmt);
} else {
final Statement stmt = conn.createUpdateStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
c = Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw new SourceException(e);
// }
String[] oldTables = tables;
List notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "clearTable, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return 0;
}
sqls = clearTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createUpdateStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected CompletableFuture createTableDBAsync(EntityInfo info, String copyTableSql, final Serializable pk, String... sqls) {
return supplyAsync(() -> createTableDB(info, copyTableSql, pk, sqls));
}
@Override
protected CompletableFuture dropTableDBAsync(EntityInfo info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> dropTableDB(info, tables, node, sqls));
}
@Override
protected int createTableDB(EntityInfo info, String copyTableSql, Serializable pk, String... sqls) {
SourceConnection conn = null;
Statement stmt;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
int c;
if (copyTableSql == null) {
if (sqls.length == 1) {
stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(sqls[0]);
conn.offerUpdateStatement(stmt);
} else {
stmt = conn.createUpdateStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
c = Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
}
} else { //建分表
try {
stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(copyTableSql);
} catch (SQLException se) {
if (isTableNotExist(info, se.getSQLState())) { //分表的原始表不存在
final String newTable = info.getTable(pk);
if (newTable.indexOf('.') <= 0) { //分表的原始表不存在
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls));
}
//创建原始表
stmt = conn.createUpdateStatement();
if (sqls.length == 1) {
stmt.execute(sqls[0]);
} else {
for (String tableSql : sqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
conn.offerUpdateStatement(stmt);
//再执行一遍创建分表操作
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(copyTableSql);
conn.offerUpdateStatement(stmt);
} else { //需要先建库
String newCatalog = newTable.substring(0, newTable.indexOf('.'));
String catalogSql = ("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog;
try {
if (info.isLoggable(logger, Level.FINEST, catalogSql)) {
logger.finest(info.getType().getSimpleName() + " createCatalog sql=" + catalogSql);
}
stmt = conn.createUpdateStatement();
stmt.executeUpdate(catalogSql);
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle1) {
logger.log(Level.SEVERE, "create database " + copyTableSql + " error", sqle1);
}
try {
//再执行一遍创建分表操作
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(copyTableSql);
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle2) {
if (isTableNotExist(info, sqle2.getSQLState())) {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls));
}
//创建原始表
stmt = conn.createUpdateStatement();
if (sqls.length == 1) {
stmt.execute(sqls[0]);
} else {
for (String tableSql : sqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
conn.offerUpdateStatement(stmt);
//再执行一遍创建分表操作
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(copyTableSql);
conn.offerUpdateStatement(stmt);
} else {
throw new SourceException(sqle2);
}
}
}
}
throw new SourceException(se);
}
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected int dropTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) {
SourceConnection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(sqls[0]);
conn.offerUpdateStatement(stmt);
} else {
final Statement stmt = conn.createUpdateStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
c = Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw new SourceException(e);
// }
String[] oldTables = tables;
List notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "dropTable, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return 0;
}
sqls = dropTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createUpdateStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected CompletableFuture updateEntityDBAsync(EntityInfo info, T... entitys) {
return supplyAsync(() -> updateEntityDB(info, entitys));
}
@Override
protected int updateEntityDB(EntityInfo info, T... entitys) {
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
int c = updateEntityDBStatement(false, conn, info, entitys);
conn.commit();
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
private int updateEntityDBStatement(final boolean batch, final SourceConnection conn, final EntityInfo info, T... entitys) throws SQLException {
final long s = System.currentTimeMillis();
String presql = null;
String caseSql = null;
PreparedStatement prestmt = null;
List prestmts = null;
Map> prepareInfos = null;
int c = -1;
final Attribute[] attrs = info.updateAttributes;
try {
if (info.getTableStrategy() == null) {
caseSql = info.getUpdateQuestionPrepareCaseSQL(entitys);
if (caseSql == null) {
presql = info.getUpdateQuestionPrepareSQL(entitys[0]);
prestmt = prepareUpdateEntityStatement(conn, presql, info, entitys);
} else {
presql = caseSql;
prestmt = conn.prepareUpdateStatement(presql);
int len = entitys.length;
final Attribute primary = info.getPrimary();
Attribute otherAttr = attrs[0];
//UPDATE twointrecord SET randomNumber = ( CASE WHEN id = ? THEN ? WHEN id = ? THEN ? WHEN id = ? THEN ? END ) WHERE id IN (?,?,?)
for (int i = 0; i < entitys.length; i++) {
Serializable pk = primary.get(entitys[i]);
prestmt.setObject(i * 2 + 1, pk); //1 3 5
prestmt.setObject(i * 2 + 2, getEntityAttrValue(info, otherAttr, entitys[i])); //2 4 6
prestmt.setObject(len * 2 + i + 1, pk); //7 8 9
}
prestmt.addBatch();
}
int c1 = 0;
int[] pc = prestmt.executeBatch();
for (int p : pc) {
if (p >= 0) {
c1 += p;
}
}
c = c1;
conn.offerUpdateStatement(prestmt);
} else {
prepareInfos = getUpdateQuestionPrepareInfo(info, entitys);
prestmts = prepareUpdateEntityStatements(conn, info, prepareInfos, entitys);
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c1 += cc;
}
}
c = c1;
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
}
if (!batch) {
conn.commit();
}
} catch (SQLException se) {
if (!batch) {
conn.rollback();
}
if (isTableNotExist(info, se.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
conn.offerUpdateStatement(stmt);
} catch (SQLException e2) {
}
}
//表不存在,更新条数为0
return 0;
} else {
//String tableName = parseNotExistTableName(se);
if (prepareInfos == null) {
throw se;
}
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]);
List notExistTables = checkNotExistTables(conn, oldTables);
if (notExistTables.isEmpty()) {
throw se;
}
for (String t : notExistTables) {
prepareInfos.remove(t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "update entitys, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + prepareInfos.keySet());
}
if (prepareInfos.isEmpty()) { //分表全部不存在
return 0;
}
prestmts = prepareUpdateEntityStatements(conn, info, prepareInfos, entitys);
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
c1 += Utility.sum(stmt.executeBatch());
}
c = c1;
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
conn.commit();
}
} else {
throw se;
}
}
if (info.isLoggable(logger, Level.FINEST) && caseSql == null) { //打印调试信息
Attribute primary = info.getPrimary();
if (info.getTableStrategy() == null) {
char[] sqlchars = presql.toCharArray();
for (final T value : entitys) {
//-----------------------------
StringBuilder sb = new StringBuilder(128);
int i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(info.formatSQLValue(obj, sqlFormatter));
}
} else {
sb.append(ch);
}
}
String debugsql = sb.toString();
if (info.isLoggable(logger, Level.FINEST, debugsql)) {
logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n"));
}
}
} else {
prepareInfos.forEach((t, p) -> {
char[] sqlchars = p.prepareSql.toCharArray();
for (final T value : p.entitys) {
//-----------------------------
StringBuilder sb = new StringBuilder(128);
int i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = i == attrs.length ? info.getSQLValue(primary, value) : info.getSQLValue(attrs[i++], value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(info.formatSQLValue(obj, sqlFormatter));
}
} else {
sb.append(ch);
}
}
String debugsql = sb.toString();
if (info.isLoggable(logger, Level.FINEST, debugsql)) {
logger.finest(info.getType().getSimpleName() + " update sql=" + debugsql.replaceAll("(\r|\n)", "\\n"));
}
}
});
}
} //打印结束
if (info.getTableStrategy() == null) {
slowLog(s, presql);
} else {
List presqls = new ArrayList<>();
prepareInfos.forEach((t, p) -> {
presqls.add(p.prepareSql);
});
slowLog(s, presqls.toArray(new String[presqls.size()]));
}
return c;
}
@Override
protected CompletableFuture updateColumnDBAsync(EntityInfo info, Flipper flipper, UpdateSqlInfo sql) {
return supplyAsync(() -> updateColumnDB(info, flipper, sql));
}
@Override
protected int updateColumnDB(EntityInfo info, Flipper flipper, UpdateSqlInfo sql) {
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
int c = updateColumnDBStatement(false, conn, info, flipper, sql);
conn.commit();
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
private int updateColumnDBStatement(final boolean batch, final SourceConnection conn, final EntityInfo info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) {
final long s = System.currentTimeMillis();
int c = -1;
String firstTable = null;
try {
if (sql.blobs != null || sql.tables != null) {
if (sql.tables == null) {
final PreparedStatement prestmt = conn.prepareUpdateStatement(sql.sql);
int index = 0;
for (byte[] param : sql.blobs) {
Blob blob = conn.createBlob();
blob.setBytes(1, param);
prestmt.setBlob(++index, blob);
}
if (info.isLoggable(logger, Level.FINEST, sql.sql)) {
logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql.sql);
}
c = prestmt.executeUpdate();
conn.offerUpdateStatement(prestmt);
if (!batch) {
conn.commit();
}
slowLog(s, sql.sql);
return c;
} else {
firstTable = sql.tables[0];
List prestmts = new ArrayList<>();
String[] sqls = new String[sql.tables.length];
for (int i = 0; i < sql.tables.length; i++) {
sqls[i] = i == 0 ? sql.sql : sql.sql.replaceFirst(firstTable, sql.tables[i]);
PreparedStatement prestmt = conn.prepareUpdateStatement(sqls[i]);
int index = 0;
if (sql.blobs != null) {
for (byte[] param : sql.blobs) {
Blob blob = conn.createBlob();
blob.setBytes(1, param);
prestmt.setBlob(++index, blob);
}
}
prestmt.addBatch();
prestmts.add(prestmt);
}
if (info.isLoggable(logger, Level.FINEST, sql.sql)) {
logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls));
}
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
c1 += Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
}
c = c1;
if (!batch) {
conn.commit();
}
slowLog(s, sqls);
}
return c;
} else {
if (info.isLoggable(logger, Level.FINEST, sql.sql)) {
logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql.sql);
}
final Statement stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(sql.sql);
conn.offerUpdateStatement(stmt);
if (!batch) {
conn.commit();
}
slowLog(s, sql.sql);
return c;
}
} catch (SQLException se) {
if (!batch) {
conn.rollback();
}
if (isTableNotExist(info, se.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement stmt = conn.createUpdateStatement();
if (tableSqls.length == 1) {
stmt.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
conn.offerUpdateStatement(stmt);
} catch (SQLException e2) {
}
}
//表不存在,更新条数为0
return 0;
} else if (sql.tables == null) {
//单一分表不存在
return 0;
} else {
// String tableName = parseNotExistTableName(se);
// if (tableName == null) {
// throw se;
// }
String[] oldTables = sql.tables;
List notExistTables = checkNotExistTables(conn, oldTables);
if (notExistTables.isEmpty()) {
throw se;
}
for (String t : notExistTables) {
sql.tables = Utility.remove(sql.tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "updateColumn, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(sql.tables));
}
if (sql.tables.length == 0) { //分表全部不存在
return 0;
}
List prestmts = new ArrayList<>();
String[] sqls = new String[sql.tables.length];
for (int i = 0; i < sql.tables.length; i++) {
sqls[i] = sql.sql.replaceFirst(firstTable, sql.tables[i]);
PreparedStatement prestmt = conn.prepareUpdateStatement(sqls[i]);
int index = 0;
if (sql.blobs != null) {
for (byte[] param : sql.blobs) {
Blob blob = conn.createBlob();
blob.setBytes(1, param);
prestmt.setBlob(++index, blob);
}
}
prestmt.addBatch();
prestmts.add(prestmt);
}
if (info.isLoggable(logger, Level.FINEST, sql.sql)) {
logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls));
}
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
c1 += Utility.sum(stmt.executeBatch());
conn.offerUpdateStatement(stmt);
}
c = c1;
if (!batch) {
conn.commit();
}
slowLog(s, sqls);
return c;
}
} else {
throw se;
}
}
}
@Override
protected CompletableFuture