This commit is contained in:
Redkale
2019-11-04 11:53:11 +08:00
parent f42561ca93
commit df3ccb763a
2 changed files with 48 additions and 60 deletions

View File

@@ -70,18 +70,17 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
}
c = c1;
} catch (SQLException se) {
if (info.tableStrategy == null || !info.isTableNotExist(se)) throw se;
synchronized (info.tables) {
final String oldTable = info.table;
if (info.getTableStrategy() == null || !info.isTableNotExist(se)) throw se;
synchronized (info.disTableLock()) {
final String catalog = conn.getCatalog();
final String newTable = info.getTable(entitys[0]);
final String tablekey = newTable.indexOf('.') > 0 ? newTable : (catalog + '.' + newTable);
if (!info.tables.contains(tablekey)) {
if (!info.containsDisTable(tablekey)) {
try {
Statement st = conn.createStatement();
st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable));
st.execute(info.getTableCopySQL(newTable));
st.close();
info.tables.add(tablekey);
info.addDisTable(tablekey);
} catch (SQLException sqle) { //多进程并发时可能会出现重复建表
if (newTable.indexOf('.') > 0 && info.isTableNotExist(se)) {
Statement st;
@@ -94,14 +93,14 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
}
try {
st = conn.createStatement();
st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable));
st.execute(info.getTableCopySQL(newTable));
st.close();
info.tables.add(tablekey);
info.addDisTable(tablekey);
} catch (SQLException sqle2) {
logger.log(Level.SEVERE, "create table2(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle2);
logger.log(Level.SEVERE, "create table2(" + info.getTableCopySQL(newTable) + ") error", sqle2);
}
} else {
logger.log(Level.SEVERE, "create table(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle);
logger.log(Level.SEVERE, "create table(" + info.getTableCopySQL(newTable) + ") error", sqle);
}
}
}
@@ -114,7 +113,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
c1 += cc;
}
c = c1;
}
}
prestmt.close();
//------------------------------------------------------------
if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息
@@ -141,9 +140,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
} //打印结束
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
@@ -199,9 +196,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close();
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
@@ -220,9 +215,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
@@ -241,9 +234,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
@@ -294,9 +285,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
prestmt.close();
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
@@ -328,9 +317,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
return CompletableFuture.completedFuture(c);
}
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) writePool.offerConnection(conn);
}
@@ -339,12 +326,12 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
@Override
protected <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(EntityInfo<T> info, String sql, FilterFuncColumn... columns) {
Connection conn = null;
final Map map = new HashMap<>();
try {
conn = readPool.poll();
//conn.setReadOnly(true);
final Statement stmt = conn.createStatement();
ResultSet set = stmt.executeQuery(sql);
final Map map = new HashMap<>();
if (set.next()) {
int index = 0;
for (FilterFuncColumn ffc : columns) {
@@ -360,9 +347,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close();
return CompletableFuture.completedFuture(map);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(map);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}
@@ -385,9 +371,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(defVal);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}
@@ -396,11 +381,11 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
@Override
protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(EntityInfo<T> info, String sql, String keyColumn) {
Connection conn = null;
Map<K, N> rs = new LinkedHashMap<>();
try {
conn = readPool.poll();
//conn.setReadOnly(true);
final Statement stmt = conn.createStatement();
Map<K, N> rs = new LinkedHashMap<>();
ResultSet set = stmt.executeQuery(sql);
ResultSetMetaData rsd = set.getMetaData();
boolean smallint = rsd == null ? false : rsd.getColumnType(1) == Types.SMALLINT;
@@ -411,9 +396,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(rs);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}
@@ -433,10 +417,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
ps.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
if (info.tableStrategy != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(null);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(null);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}
@@ -460,10 +442,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
ps.close();
return CompletableFuture.completedFuture(val == null ? defValue : val);
} catch (SQLException e) {
if (info.tableStrategy != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(defValue);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(defValue);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}
@@ -483,10 +463,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
if (info.isLoggable(logger, Level.FINEST, sql)) logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql);
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
if (info.tableStrategy != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(false);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(false);
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}
@@ -557,10 +535,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
ps.close();
return CompletableFuture.completedFuture(new Sheet<>(total, list));
} catch (SQLException e) {
if (info.tableStrategy != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(new Sheet<>());
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
if (info.getTableStrategy() != null && info.isTableNotExist(e)) return CompletableFuture.completedFuture(new Sheet<>());
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}

View File

@@ -89,16 +89,16 @@ public final class EntityInfo<T> {
final String notcontainSQL;
//用于判断表不存在的使用, 多个SQLState用;隔开
final String tablenotexistSqlstates;
private final String tablenotexistSqlstates;
//用于复制表结构使用
final String tablecopySQL;
private final String tablecopySQL;
//用于存在database.table_20160202类似这种分布式表
final Set<String> tables = new HashSet<>();
private final Set<String> tables = new HashSet<>();
//分表 策略
final DistributeTableStrategy<T> tableStrategy;
private final DistributeTableStrategy<T> tableStrategy;
//根据主键查找单个对象的SQL
private final String queryPrepareSQL;
@@ -510,6 +510,18 @@ public final class EntityInfo<T> {
return tableStrategy;
}
public Object disTableLock() {
return tables;
}
public boolean containsDisTable(String tablekey) {
return tables.contains(tablekey);
}
public void addDisTable(String tablekey) {
tables.add(tablekey);
}
public String getTableNotExistSqlStates2() {
return tablenotexistSqlstates;
}