DataJdbcSource优化

This commit is contained in:
redkale
2023-05-06 08:21:27 +08:00
parent 1b1bfbb3ac
commit 18932bf48b
2 changed files with 25 additions and 66 deletions

View File

@@ -2696,9 +2696,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected String url;
protected int urlVersion;
protected Properties clientInfo = new Properties();
protected final AtomicInteger urlVersion = new AtomicInteger();
public ConnectionPool(Properties prop) {
this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "30"));
@@ -2706,7 +2704,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (workExecutor instanceof ThreadPoolExecutor) {
defMaxConns = ((ThreadPoolExecutor) workExecutor).getCorePoolSize();
} else if (workExecutor != null) { //maybe virtual thread pool
defMaxConns = Math.min(1024, Utility.cpus() * 100);
defMaxConns = Math.min(1000, Utility.cpus() * 100);
}
this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns)));
this.canNewSemaphore = new Semaphore(this.maxConns);
@@ -2726,15 +2724,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} catch (SQLException e) {
throw new SourceException(e);
}
clientInfo.put("version", String.valueOf(urlVersion));
resetMaxConnection();
}
@ResourceListener
public void onResourceChange(ResourceEvent[] events) {
String newUrl = this.url;
int newConnectTimeoutSeconds = this.connectTimeoutSeconds;
int newMaxconns = this.maxConns;
String newUrl = this.url;
String newUser = this.connectAttrs.getProperty("user");
String newPassword = this.connectAttrs.getProperty("password");
for (ResourceEvent event : events) {
@@ -2752,10 +2749,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
if (!Objects.equals(newUser, this.connectAttrs.get("user"))
|| !Objects.equals(newPassword, this.connectAttrs.get("password")) || !Objects.equals(newUrl, url)) {
this.urlVersion++;
Properties newClientInfo = new Properties();
newClientInfo.put("version", String.valueOf(urlVersion));
this.clientInfo = newClientInfo;
this.urlVersion.incrementAndGet();
}
this.url = newUrl;
this.connectTimeoutSeconds = newConnectTimeoutSeconds;
@@ -2802,12 +2796,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
this.canNewSemaphore = new Semaphore(this.maxConns);
SourceConnection c;
while ((c = oldQueue.poll()) != null) {
try {
if (c.getClientInfo() != null) {
c.getClientInfo().put("version", "-1");
}
} catch (SQLException e) {
}
c.version = -1;
offerConnection(c, oldSemaphore);
}
}
@@ -2833,12 +2822,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
SourceConnection conn = null;
if (semaphore.tryAcquire()) {
try {
conn = new SourceConnection(driver.connect(url, connectAttrs));
if (conn.getClientInfo() != null) {
conn.getClientInfo().put("version", clientInfo.getProperty("version"));
} else {
conn.setClientInfo(clientInfo);
}
conn = new SourceConnection(driver.connect(url, connectAttrs), this.urlVersion.get());
} catch (SQLException ex) {
throw new SourceException(ex);
}
@@ -2864,34 +2848,25 @@ public class DataJdbcSource extends AbstractDataSqlSource {
private <C> void offerConnection(final C connection, Semaphore semaphore) {
SourceConnection conn = (SourceConnection) connection;
if (conn == null) {
return;
}
try {
if (checkValid(conn) && queue.offer(conn)) {
usingCounter.decrement();
} else {
usingCounter.decrement();
closeCounter.increment();
semaphore.release();
conn.close();
if (conn != null) {
try {
if (checkValid(conn) && queue.offer(conn)) {
usingCounter.decrement();
} else {
usingCounter.decrement();
closeCounter.increment();
semaphore.release();
conn.close();
}
} catch (Exception e) {
logger.log(Level.WARNING, "closeSQLConnection abort", e);
}
} catch (Exception e) {
logger.log(Level.WARNING, "closeSQLConnection abort", e);
}
}
protected boolean checkValid(SourceConnection conn) {
try {
boolean rs = !conn.conn.isClosed() && conn.conn.isValid(1);
if (!rs) {
return rs;
}
Properties prop = conn.getClientInfo();
if (prop == null) {
return false;
}
return prop == clientInfo || Objects.equals(prop.getProperty("version"), clientInfo.getProperty("version"));
return !conn.conn.isClosed() && conn.conn.isValid(1) && conn.version == this.urlVersion.get();
} catch (SQLException ex) {
if (!"08S01".equals(ex.getSQLState())) {//MySQL特性 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
logger.log(Level.FINER, "result.getConnection from pooled connection abort [" + ex.getSQLState() + "]", ex);
@@ -2913,21 +2888,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected class SourceConnection {
public int version;
public final Connection conn;
private final Map<String, PreparedStatement> prestms = new ConcurrentHashMap<>();
public SourceConnection(Connection conn) {
public SourceConnection(Connection conn, int version) {
Objects.requireNonNull(conn);
this.conn = conn;
}
public Properties getClientInfo() throws SQLException {
return conn.getClientInfo();
}
public void setClientInfo(Properties clientInfo) throws SQLException {
conn.setClientInfo(clientInfo);
this.version = version;
}
public Statement createStatement() throws SQLException {
@@ -2935,16 +2903,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
public PreparedStatement prepareStatement(String sql) throws SQLException {
PreparedStatement rs = prestms.computeIfAbsent(sql, s -> {
try {
return conn.prepareStatement(sql);
} catch (SQLException e) {
throw new RedkaleException(e);
}
});
rs.clearParameters();
rs.clearBatch();
return rs;
return conn.prepareStatement(sql);
}
public void setAutoCommit(boolean autoCommit) throws SQLException {

View File

@@ -136,7 +136,7 @@ public interface DataResultSet extends EntityInfo.DataResultSetRow {
if (o instanceof byte[]) {
o = new BigDecimal(new String((byte[]) o));
} else {
o = new BigInteger(o.toString());
o = new BigDecimal(o.toString());
}
}
} else if (t == String.class) {