优化DataJdbcSource

This commit is contained in:
redkale
2023-04-17 23:44:39 +08:00
parent 53e8f44088
commit 021bf8ce51
3 changed files with 110 additions and 35 deletions

View File

@@ -300,7 +300,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement
if (pos > 0) {
url = url.substring(0, pos) + "...";
}
return getClass().getSimpleName() + "{url=" + url + "}";
return getClass().getSimpleName() + "{url=" + url + ", maxconns=" + readMaxConns() + "}";
} else {
String readUrl = readConfProps.getProperty(DATA_SOURCE_URL);
int pos = readUrl.indexOf('?');
@@ -312,10 +312,14 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement
if (pos > 0) {
writeUrl = writeUrl.substring(0, pos) + "...";
}
return getClass().getSimpleName() + "{readurl=" + readUrl + ",writeurl=" + writeUrl + "}";
return getClass().getSimpleName() + "{read-url=" + readUrl + ", read-maxconns=" + readMaxConns() + ",write-url=" + writeUrl + ", write-maxconns=" + writeMaxConns() + "}";
}
}
protected abstract int readMaxConns();
protected abstract int writeMaxConns();
//生成创建表的SQL
protected <T> String[] createTableSqls(EntityInfo<T> info) {
if (info == null || !autoDDL) {

View File

@@ -67,6 +67,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
this.writePool.onResourceChange(events);
}
@Override
protected int readMaxConns() {
return this.readPool.maxConns;
}
@Override
protected int writeMaxConns() {
return this.writePool.maxConns;
}
@Override
public void destroy(AnyValue config) {
if (readPool != null) {
@@ -2694,8 +2704,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected final LongAdder cycleCounter = new LongAdder(); //已复用连接数
protected final LongAdder waitingCounter = new LongAdder(); //可用中连接数
protected final java.sql.Driver driver;
protected final Properties connectAttrs;
@@ -2706,6 +2714,8 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected int maxConns;
protected Semaphore canNewSemaphore;
protected String url;
protected int urlVersion;
@@ -2713,7 +2723,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected Properties clientInfo = new Properties();
public ConnectionPool(Properties prop) {
this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "6"));
this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "30"));
int defMaxConns = Utility.cpus() * 4;
if (workExecutor instanceof ThreadPoolExecutor) {
defMaxConns = ((ThreadPoolExecutor) workExecutor).getCorePoolSize();
@@ -2723,6 +2733,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
defMaxConns = Math.min(1000, Utility.cpus() * 100);
}
this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns)));
this.canNewSemaphore = new Semaphore(this.maxConns);
this.queue = new ArrayBlockingQueue<>(maxConns);
this.url = prop.getProperty(DATA_SOURCE_URL);
String username = prop.getProperty(DATA_SOURCE_USER, "");
@@ -2740,6 +2751,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
throw new SourceException(e);
}
clientInfo.put("version", String.valueOf(urlVersion));
resetMaxConnection();
}
@ResourceListener
@@ -2776,52 +2788,101 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (newMaxconns != this.maxConns) {
ArrayBlockingQueue<Connection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<Connection> oldQueue = this.queue;
Semaphore oldSemaphore = this.canNewSemaphore;
this.queue = newQueue;
this.maxConns = newMaxconns;
this.canNewSemaphore = new Semaphore(this.maxConns);
Connection conn;
while ((conn = oldQueue.poll()) != null) {
offerConnection(conn, oldSemaphore);
}
}
}
private void resetMaxConnection() {
if ("mysql".equals(dbtype()) || "postgresql".equals(dbtype())) {
int newMaxconns = this.maxConns;
Connection conn = pollConnection();
try {
Statement stmt = conn.createStatement();
if ("mysql".equals(dbtype())) {
ResultSet rs = stmt.executeQuery("SHOW VARIABLES LIKE 'max_connections'");
if (rs.next()) {
newMaxconns = rs.getInt(2);
}
} else if ("postgresql".equals(dbtype())) {
ResultSet rs = stmt.executeQuery("SHOW max_connections");
if (rs.next()) {
newMaxconns = rs.getInt(1);
}
}
stmt.close();
} catch (SQLException e) {
} finally {
offerConnection(conn);
}
if (this.maxConns > newMaxconns) { //配置连接数过大
ArrayBlockingQueue<Connection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<Connection> oldQueue = this.queue;
Semaphore oldSemaphore = this.canNewSemaphore;
this.queue = newQueue;
this.maxConns = newMaxconns;
this.canNewSemaphore = new Semaphore(this.maxConns);
Connection c;
while ((c = oldQueue.poll()) != null) {
offerConnection(c, oldSemaphore);
}
}
}
}
public Connection pollConnection() {
Connection conn = queue.poll();
if (conn == null) {
if (usingCounter.intValue() >= maxConns) {
try {
conn = queue.poll(connectTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException t) {
logger.log(Level.WARNING, "take pooled connection error", t);
}
if (conn == null) {
throw new SourceException("create pooled connection timeout");
}
}
}
if (conn != null) {
usingCounter.increment();
waitingCounter.decrement();
if (checkValid(conn)) {
cycleCounter.increment();
return conn;
} else {
offerConnection(conn);
conn = null;
}
}
try {
conn = driver.connect(url, connectAttrs);
conn.setClientInfo(clientInfo);
} catch (SQLException ex) {
throw new SourceException(ex);
return newConnection();
}
usingCounter.increment();
creatCounter.increment();
return conn;
if (checkValid(conn)) {
cycleCounter.increment();
return conn;
} else {
offerConnection(conn);
conn = null;
}
return newConnection();
}
private Connection newConnection() {
Semaphore semaphore = this.canNewSemaphore;
Connection conn = null;
if (semaphore.tryAcquire()) {
try {
conn = driver.connect(url, connectAttrs);
conn.setClientInfo(clientInfo);
} catch (SQLException ex) {
throw new SourceException(ex);
}
usingCounter.increment();
creatCounter.increment();
return conn;
} else {
try {
conn = queue.poll(connectTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException t) {
logger.log(Level.WARNING, "take pooled connection error", t);
}
if (conn == null) {
throw new SourceException("create pooled connection timeout");
}
return conn;
}
}
public <C> void offerConnection(final C connection) {
offerConnection(connection, this.canNewSemaphore);
}
private <C> void offerConnection(final C connection, Semaphore semaphore) {
Connection conn = (Connection) connection;
if (conn == null) {
return;
@@ -2829,10 +2890,10 @@ public class DataJdbcSource extends AbstractDataSqlSource {
try {
if (checkValid(conn) && queue.offer(conn)) {
usingCounter.decrement();
waitingCounter.increment();
} else {
usingCounter.decrement();
closeCounter.increment();
semaphore.release();
conn.close();
}
} catch (Exception e) {

View File

@@ -40,6 +40,16 @@ public class DataMemorySource extends AbstractDataSqlSource implements SearchSou
this.cacheForbidden = false;
}
@Override
protected int readMaxConns() {
return -1;
}
@Override
protected int writeMaxConns() {
return -1;
}
@Local
@Override
public String getType() {