DataJdbcSource优化

This commit is contained in:
redkale
2023-12-04 11:22:01 +08:00
parent 225460f522
commit aeafac22b0
2 changed files with 163 additions and 162 deletions

View File

@@ -147,7 +147,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return false;
}
protected <T> List<PreparedStatement> prepareInsertEntityStatements(SourceConnection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
protected <T> List<PreparedStatement> prepareInsertEntityStatements(JdbcConnection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final List<PreparedStatement> prestmts = new ArrayList<>();
for (Map.Entry<String, PrepareInfo<T>> en : prepareInfos.entrySet()) {
@@ -164,7 +164,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return prestmts;
}
protected <T> PreparedStatement prepareInsertEntityStatement(SourceConnection conn, String sql, EntityInfo<T> info, T... entitys) throws SQLException {
protected <T> PreparedStatement prepareInsertEntityStatement(JdbcConnection conn, String sql, EntityInfo<T> info, T... entitys) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final PreparedStatement prestmt = info.isAutoGenerated()
? conn.prepareUpdateStatement(sql, Statement.RETURN_GENERATED_KEYS)
@@ -176,7 +176,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return prestmt;
}
protected <T> List<PreparedStatement> prepareUpdateEntityStatements(SourceConnection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
protected <T> List<PreparedStatement> prepareUpdateEntityStatements(JdbcConnection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
Attribute<T, Serializable> primary = info.primary;
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final List<PreparedStatement> prestmts = new ArrayList<>();
@@ -193,7 +193,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return prestmts;
}
protected <T> PreparedStatement prepareUpdateEntityStatement(SourceConnection conn, String prepareSQL, EntityInfo<T> info, T... entitys) throws SQLException {
protected <T> PreparedStatement prepareUpdateEntityStatement(JdbcConnection conn, String prepareSQL, EntityInfo<T> info, T... entitys) throws SQLException {
Attribute<T, Serializable> primary = info.primary;
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final PreparedStatement prestmt = conn.prepareUpdateStatement(prepareSQL);
@@ -205,7 +205,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return prestmt;
}
protected <T> int bindStatementParameters(SourceConnection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException {
protected <T> int bindStatementParameters(JdbcConnection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
Object val = getEntityAttrValue(info, attr, entity);
@@ -234,7 +234,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return 0;
}
int c = 0;
SourceConnection conn = null;
JdbcConnection conn = null;
List<Statement> stmtsRef = new ArrayList<>();
try {
conn = writePool.pollTransConnection();
@@ -308,9 +308,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
conn.rollback(stmtsRef);
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerTransConnection(conn);
}
writePool.offerTransConnection(conn);
}
}
@@ -326,7 +324,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int insertDB(EntityInfo<T> info, T... entitys) {
SourceConnection conn = null;
JdbcConnection conn = null;
List<Statement> stmtsRef = new ArrayList<>();
try {
conn = writePool.pollConnection();
@@ -337,13 +335,11 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (SQLException e) {
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
private <T> int insertDBStatement(List<Statement> stmtsRef, final SourceConnection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
private <T> int insertDBStatement(List<Statement> stmtsRef, final JdbcConnection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
final long s = System.currentTimeMillis();
int c = 0;
String presql = null;
@@ -657,7 +653,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
SourceConnection conn = null;
JdbcConnection conn = null;
List<Statement> stmtsRef = new ArrayList<>();
try {
conn = writePool.pollConnection();
@@ -669,13 +665,11 @@ public class DataJdbcSource extends AbstractDataSqlSource {
conn.rollback(stmtsRef);
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
private <T> int deleteDBStatement(List<Statement> stmtsRef, final SourceConnection conn, final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) throws SQLException {
private <T> int deleteDBStatement(List<Statement> stmtsRef, final JdbcConnection conn, final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) throws SQLException {
final long s = System.currentTimeMillis();
Statement stmt = null;
try {
@@ -774,7 +768,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int clearTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
Statement stmt = null;
try {
@@ -847,9 +841,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
@@ -865,7 +857,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int createTableDB(EntityInfo<T> info, String copyTableSql, Serializable pk, String... sqls) {
SourceConnection conn = null;
JdbcConnection conn = null;
Statement stmt = null;
final long s = System.currentTimeMillis();
try {
@@ -970,15 +962,13 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (SQLException e) {
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
@Override
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
Statement stmt = null;
try {
@@ -1048,9 +1038,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
@@ -1061,7 +1049,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int updateEntityDB(EntityInfo<T> info, T... entitys) {
SourceConnection conn = null;
JdbcConnection conn = null;
List<Statement> stmtsRef = new ArrayList<>();
try {
conn = writePool.pollConnection();
@@ -1073,13 +1061,11 @@ public class DataJdbcSource extends AbstractDataSqlSource {
conn.rollback(stmtsRef);
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
private <T> int updateEntityDBStatement(List<Statement> stmtsRef, final SourceConnection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
private <T> int updateEntityDBStatement(List<Statement> stmtsRef, final JdbcConnection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
final long s = System.currentTimeMillis();
String presql = null;
PreparedStatement prestmt = null;
@@ -1242,7 +1228,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int updateColumnDB(EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) {
SourceConnection conn = null;
JdbcConnection conn = null;
List<Statement> stmtsRef = new ArrayList<>();
try {
conn = writePool.pollConnection();
@@ -1254,13 +1240,11 @@ public class DataJdbcSource extends AbstractDataSqlSource {
conn.rollback(stmtsRef);
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
private <T> int updateColumnDBStatement(List<Statement> stmtsRef, final SourceConnection conn, final EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) {
private <T> int updateColumnDBStatement(List<Statement> stmtsRef, final JdbcConnection conn, final EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) {
final long s = System.currentTimeMillis();
int c = -1;
String firstTable = null;
@@ -1418,7 +1402,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T, N extends Number> Map<String, N> getNumberMapDB(EntityInfo<T> info, String[] tables, String sql, FilterNode node, FilterFuncColumn... columns) {
SourceConnection conn = null;
JdbcConnection conn = null;
final Map map = new HashMap<>();
final long s = System.currentTimeMillis();
Statement stmt = null;
@@ -1509,9 +1493,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -1522,7 +1504,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> Number getNumberResultDB(EntityInfo<T> info, String[] tables, String sql, FilterFunc func, Number defVal, String column, FilterNode node) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
Statement stmt = null;
try {
@@ -1599,9 +1581,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -1612,7 +1592,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T, K extends Serializable, N extends Number> Map<K, N> queryColumnMapDB(EntityInfo<T> info, String[] tables, String sql, String keyColumn, FilterFunc func, String funcColumn, FilterNode node) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
Map<K, N> rs = new LinkedHashMap<>();
Statement stmt = null;
@@ -1687,9 +1667,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -1700,7 +1678,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T, K extends Serializable, N extends Number> Map<K[], N[]> queryColumnMapDB(EntityInfo<T> info, String[] tables, String sql, final ColumnNode[] funcNodes, final String[] groupByColumns, final FilterNode node) {
SourceConnection conn = null;
JdbcConnection conn = null;
Map rs = new LinkedHashMap<>();
final long s = System.currentTimeMillis();
Statement stmt = null;
@@ -1790,9 +1768,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -1815,7 +1791,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
protected <T> T findDB(EntityInfo<T> info, Serializable pk) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
PreparedStatement prestmt = null;
try {
@@ -1824,9 +1800,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
String prepareSQL = info.getFindQuestionPrepareSQL(pk);
prestmt = conn.prepareQueryStatement(prepareSQL);
prestmt.setObject(1, pk);
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? info.getBuilder().getFullEntityValue(set) : null;
set.close();
ResultSet rr = prestmt.executeQuery();
T rs = rr.next() ? info.getBuilder().getFullEntityValue(createDataResultSet(info, rr)) : null;
rr.close();
conn.offerQueryStatement(prestmt);
slowLog(s, prepareSQL);
return rs;
@@ -1836,9 +1812,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -1849,7 +1823,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> T findDB(EntityInfo<T> info, String[] tables, String sql, boolean onlypk, SelectColumn selects, Serializable pk, FilterNode node) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
PreparedStatement prestmt = null;
try {
@@ -1857,9 +1831,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
conn.setAutoCommit(true);
prestmt = conn.prepareQueryStatement(sql);
prestmt.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? selects == null ? info.getBuilder().getFullEntityValue(set) : info.getBuilder().getEntityValue(selects, set) : null;
set.close();
ResultSet rr = prestmt.executeQuery();
T rs = rr.next() ? info.getBuilder().getEntityValue(selects, createDataResultSet(info, rr)) : null;
rr.close();
conn.offerQueryStatement(prestmt);
slowLog(s, sql);
return rs;
@@ -1903,9 +1877,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
prestmt = conn.prepareQueryStatement(sql);
prestmt.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? selects == null ? info.getBuilder().getFullEntityValue(set) : info.getBuilder().getEntityValue(selects, set) : null;
set.close();
ResultSet rr = prestmt.executeQuery();
T rs = rr.next() ? info.getBuilder().getEntityValue(selects, createDataResultSet(info, rr)) : null;
rr.close();
conn.offerQueryStatement(prestmt);
slowLog(s, sql);
return rs;
@@ -1916,9 +1890,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -1929,7 +1901,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> Serializable findColumnDB(EntityInfo<T> info, String[] tables, String sql, boolean onlypk, String column, Serializable defValue, Serializable pk, FilterNode node) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
PreparedStatement prestmt = null;
final Attribute<T, Serializable> attr = info.getAttribute(column);
@@ -1996,16 +1968,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
conn.offerQueryStatement(prestmt);
slowLog(s, sql);
return val == null ? defValue : val;
} catch (SQLException se) {
} catch (Exception se) {
throw new SourceException(se);
}
}
}
throw new SourceException(e);
} catch (Exception ex) {
throw new SourceException(ex);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -2016,7 +1988,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> boolean existsDB(EntityInfo<T> info, String[] tables, String sql, boolean onlypk, Serializable pk, FilterNode node) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
PreparedStatement prestmt = null;
try {
@@ -2087,9 +2059,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -2098,12 +2068,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final EntityInfo<T> info = loadEntityInfo(clazz);
Serializable[] ids = pks.toArray(serialArrayFunc);
if (info.getTableStrategy() == null) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
final List<T> list = new ArrayList();
try {
conn = readPool.pollConnection();
conn.setAutoCommit(true);
final List<T> list = new ArrayList();
try {
String prepareSQL = info.getFindQuestionPrepareSQL(ids[0]);
PreparedStatement prestmt = conn.prepareQueryStatement(prepareSQL);
@@ -2133,9 +2103,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (Exception e) {
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
} else {
return queryList(info.getType(), null, null, FilterNodes.in(info.getPrimarySQLColumn(), ids));
@@ -2153,7 +2121,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
protected <T> Sheet<T> querySheetFullListDB(EntityInfo<T> info) {
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
try {
conn = readPool.pollConnection();
@@ -2182,9 +2150,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (Exception e) {
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -2193,7 +2159,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (!needTotal && !distinct && selects == null && flipper == null && node == null && info.getTableStrategy() == null) {
return querySheetFullListDB(info);
}
SourceConnection conn = null;
JdbcConnection conn = null;
final long s = System.currentTimeMillis();
final SelectColumn sels = selects;
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
@@ -2258,14 +2224,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (Exception e) {
throw new SourceException(e);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
private <T> Sheet<T> executeQuerySheet(EntityInfo<T> info, boolean needTotal, Flipper flipper, SelectColumn sels,
long s, SourceConnection conn, boolean mysqlOrPgsql, String listSql, String countSql) throws SQLException {
long s, JdbcConnection conn, boolean mysqlOrPgsql, String listSql, String countSql) throws SQLException {
final List<T> list = new ArrayList();
if (mysqlOrPgsql) { //sql可以带limit、offset
ResultSet set;
@@ -2383,7 +2347,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return new String[]{listSql, countSql};
}
protected List<String> checkNotExistTablesNoThrows(SourceConnection conn, String[] tables) {
protected List<String> checkNotExistTablesNoThrows(JdbcConnection conn, String[] tables) {
try {
return checkNotExistTables(conn, tables); //, firstNotExistTable
} catch (SQLException e) {
@@ -2391,7 +2355,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
protected List<String> checkNotExistTables(SourceConnection conn, String[] tables) throws SQLException { //, String firstNotExistTable
protected List<String> checkNotExistTables(JdbcConnection conn, String[] tables) throws SQLException { //, String firstNotExistTable
// 数据库不一定要按批量提交的SQL顺序执行 所以第一个不存在的表不一定在tables的第一位,
// 比如 DELETE FROM table1; DELETE FROM table2; 如果table1、table2都不存在SQL可能会抛出table2不存在的异常
// List<String> maybeNoTables = new ArrayList<>();
@@ -2444,7 +2408,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
final long s = System.currentTimeMillis();
Statement stmt = null;
SourceConnection conn = writePool.pollConnection();
JdbcConnection conn = writePool.pollConnection();
try {
conn.setAutoCommit(false);
stmt = conn.createUpdateStatement();
@@ -2461,9 +2425,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
conn.rollback(stmt);
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
@@ -2486,7 +2448,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
public int nativeUpdate(String sql, Map<String, Object> params) {
DataNativeSqlStatement sinfo = super.nativeParse(sql, params);
final long s = System.currentTimeMillis();
SourceConnection conn = writePool.pollConnection();
JdbcConnection conn = writePool.pollConnection();
Statement stmt = null;
try {
conn.setAutoCommit(false);
@@ -2513,9 +2475,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
conn.rollback(stmt);
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
writePool.offerConnection(conn);
}
}
@@ -2533,7 +2493,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
public <V> V nativeQuery(String sql, BiConsumer<Object, Object> consumer, Function<DataResultSet, V> handler) {
final long s = System.currentTimeMillis();
final SourceConnection conn = readPool.pollConnection();
final JdbcConnection conn = readPool.pollConnection();
try {
conn.setAutoCommit(true);
if (logger.isLoggable(Level.FINEST)) {
@@ -2552,9 +2512,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (Exception ex) {
throw new SourceException(ex);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -2563,7 +2521,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
public <V> V nativeQuery(String sql, BiConsumer<Object, Object> consumer, Function<DataResultSet, V> handler, Map<String, Object> params) {
DataNativeSqlStatement sinfo = super.nativeParse(sql, params);
final long s = System.currentTimeMillis();
final SourceConnection conn = readPool.pollConnection();
final JdbcConnection conn = readPool.pollConnection();
try {
conn.setAutoCommit(true);
if (logger.isLoggable(Level.FINEST)) {
@@ -2599,9 +2557,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (Exception ex) {
throw new SourceException(ex);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -2609,7 +2565,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final boolean mysqlOrPgsql = "mysql".equals(dbtype()) || "postgresql".equals(dbtype());
DataNativeSqlStatement sinfo = super.nativeParse(sql, params);
final long s = System.currentTimeMillis();
final SourceConnection conn = readPool.pollConnection();
final JdbcConnection conn = readPool.pollConnection();
try {
conn.setAutoCommit(true);
if (logger.isLoggable(Level.FINEST)) {
@@ -2671,9 +2627,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (Exception ex) {
throw new SourceException(ex);
} finally {
if (conn != null) {
readPool.offerConnection(conn);
}
readPool.offerConnection(conn);
}
}
@@ -2743,26 +2697,30 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
@Override
public <T> Serializable getObject(Attribute<T, Serializable> attr, int index, String column) {
public <T> Serializable getObject(Attribute<T, Serializable> attr, int index, String columnLabel) {
Class t = attr.type();
if (t == int.class || t == String.class) {
return DataResultSet.getRowColumnValue(this, attr, index, column);
if (t == int.class) {
return index > 0 ? getInt(index) : getInt(columnLabel);
} else if (t == String.class) {
return index > 0 ? getString(index) : getString(columnLabel);
} else if (t == long.class) {
return index > 0 ? getLong(index) : getLong(columnLabel);
} else if (t == java.util.Date.class) {
Object val = index > 0 ? getObject(index) : getObject(column);
Object val = index > 0 ? getObject(index) : getObject(columnLabel);
return val == null ? null : new java.util.Date(((java.sql.Date) val).getTime());
} else if (t == java.time.LocalDate.class) {
Object val = index > 0 ? getObject(index) : getObject(column);
Object val = index > 0 ? getObject(index) : getObject(columnLabel);
return val == null ? null : ((java.sql.Date) val).toLocalDate();
} else if (t == java.time.LocalTime.class) {
Object val = index > 0 ? getObject(index) : getObject(column);
Object val = index > 0 ? getObject(index) : getObject(columnLabel);
return val == null ? null : ((java.sql.Time) val).toLocalTime();
} else if (t == java.time.LocalDateTime.class) {
Object val = index > 0 ? getObject(index) : getObject(column);
Object val = index > 0 ? getObject(index) : getObject(columnLabel);
return val == null ? null : ((java.sql.Timestamp) val).toLocalDateTime();
} else if (t.getName().startsWith("java.sql.")) {
return index > 0 ? (Serializable) getObject(index) : (Serializable) getObject(column);
return index > 0 ? (Serializable) getObject(index) : (Serializable) getObject(columnLabel);
}
return DataResultSet.getRowColumnValue(this, attr, index, column);
return DataResultSet.getRowColumnValue(this, attr, index, columnLabel);
}
@Override
@@ -2825,6 +2783,54 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
public int getInt(int index) {
try {
return rr.getInt(index);
} catch (SQLException e) {
throw new SourceException(e);
}
}
public int getInt(String column) {
try {
return rr.getInt(column);
} catch (SQLException e) {
throw new SourceException(e);
}
}
public long getLong(int index) {
try {
return rr.getLong(index);
} catch (SQLException e) {
throw new SourceException(e);
}
}
public long getLong(String column) {
try {
return rr.getLong(column);
} catch (SQLException e) {
throw new SourceException(e);
}
}
public String getString(int index) {
try {
return rr.getString(index);
} catch (SQLException e) {
throw new SourceException(e);
}
}
public String getString(String column) {
try {
return rr.getString(column);
} catch (SQLException e) {
throw new SourceException(e);
}
}
@Override
public EntityInfo getEntityInfo() {
return info;
@@ -2846,7 +2852,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected final Properties connectAttrs;
protected ArrayBlockingQueue<SourceConnection> queue;
protected ArrayBlockingQueue<JdbcConnection> queue;
protected int connectTimeoutSeconds;
@@ -2882,8 +2888,10 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (password != null) {
this.connectAttrs.put("password", password);
}
if (!url.contains("prepareThreshold=")) {
this.connectAttrs.put("prepareThreshold", "-1");
if ("postgresql".equals(dbtype())) {
if (!url.contains("prepareThreshold=")) {
this.connectAttrs.put("prepareThreshold", "-1");
}
}
try {
this.driver = DriverManager.getDriver(this.url);
@@ -2917,10 +2925,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|| !Objects.equals(newPassword, this.connectAttrs.get("password")) || !Objects.equals(newUrl, url)) {
this.urlVersion.incrementAndGet();
}
if (!newUrl.contains("prepareThreshold=")) {
this.connectAttrs.put("prepareThreshold", "-1");
} else {
this.connectAttrs.remove("prepareThreshold");
if ("postgresql".equals(dbtype())) {
if (!newUrl.contains("prepareThreshold=")) {
this.connectAttrs.put("prepareThreshold", "-1");
} else {
this.connectAttrs.remove("prepareThreshold");
}
}
this.url = newUrl;
this.connectTimeoutSeconds = newConnectTimeoutSeconds;
@@ -2960,21 +2970,21 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
private void changeMaxConns(int newMaxconns) {
ArrayBlockingQueue<SourceConnection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<SourceConnection> oldQueue = this.queue;
ArrayBlockingQueue<JdbcConnection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<JdbcConnection> oldQueue = this.queue;
Semaphore oldSemaphore = this.newSemaphore;
this.queue = newQueue;
this.maxConns = newMaxconns;
this.newSemaphore = new Semaphore(this.maxConns);
SourceConnection c;
JdbcConnection c;
while ((c = oldQueue.poll()) != null) {
c.version = -1;
offerConnection(c, oldSemaphore, this.queue);
}
}
public SourceConnection pollConnection() {
SourceConnection conn = queue.poll();
public JdbcConnection pollConnection() {
JdbcConnection conn = queue.poll();
if (conn == null) {
return newConnection(this.queue);
}
@@ -2990,28 +3000,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
//用于事务的连接
public SourceConnection pollTransConnection() {
SourceConnection conn = queue.poll();
if (conn == null) {
return newConnection(this.queue);
}
usingCounter.increment();
if (checkValid(conn)) {
cycleCounter.increment();
return conn;
} else {
offerConnection(conn);
conn = null;
}
return newConnection(this.queue);
public JdbcConnection pollTransConnection() {
return pollTransConnection();
}
private SourceConnection newConnection(ArrayBlockingQueue<SourceConnection> queue) {
private JdbcConnection newConnection(ArrayBlockingQueue<JdbcConnection> queue) {
Semaphore semaphore = this.newSemaphore;
SourceConnection conn = null;
JdbcConnection conn = null;
if (semaphore.tryAcquire()) {
try {
conn = new SourceConnection(driver.connect(url, connectAttrs), readFlag, this.urlVersion.get());
conn = new JdbcConnection(driver.connect(url, connectAttrs), readFlag, this.urlVersion.get());
} catch (SQLException ex) {
semaphore.release();
throw new SourceException(ex);
@@ -3040,8 +3038,8 @@ public class DataJdbcSource extends AbstractDataSqlSource {
offerConnection(connection, this.newSemaphore, this.queue);
}
private <C> void offerConnection(final C connection, Semaphore semaphore, Queue<SourceConnection> queue) {
SourceConnection conn = (SourceConnection) connection;
private <C> void offerConnection(final C connection, Semaphore semaphore, Queue<JdbcConnection> queue) {
JdbcConnection conn = (JdbcConnection) connection;
if (conn != null) {
conn.commiting = false;
try {
@@ -3059,7 +3057,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
protected boolean checkValid(SourceConnection conn) {
protected boolean checkValid(JdbcConnection conn) {
try {
return !conn.conn.isClosed() && conn.version == this.urlVersion.get();
} catch (SQLException ex) {
@@ -3082,7 +3080,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
protected class SourceConnection extends DataJdbcConnection {
protected class JdbcConnection extends DataJdbcConnection {
public int version;
@@ -3090,7 +3088,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
boolean commiting;
public SourceConnection(Connection conn, boolean readFlag, int version) {
public JdbcConnection(Connection conn, boolean readFlag, int version) {
super(readFlag);
Objects.requireNonNull(conn);
this.conn = conn;

View File

@@ -350,6 +350,9 @@ public class EntityBuilder<T> {
* @return Entity对象
*/
public T getEntityValue(final SelectColumn sels, final DataResultSetRow row) {
if (sels == null) {
return getFullEntityValue(row);
}
if (row.wasNull()) {
return null;
}