DataJdbcSource优化
This commit is contained in:
@@ -371,6 +371,10 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxConns() {
|
||||||
|
return connLimit;
|
||||||
|
}
|
||||||
|
|
||||||
public int getReadTimeoutSeconds() {
|
public int getReadTimeoutSeconds() {
|
||||||
return readTimeoutSeconds;
|
return readTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2786,30 +2786,15 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
|||||||
this.connectAttrs.put("user", newUser);
|
this.connectAttrs.put("user", newUser);
|
||||||
this.connectAttrs.put("password", newPassword);
|
this.connectAttrs.put("password", newPassword);
|
||||||
if (newMaxconns != this.maxConns) {
|
if (newMaxconns != this.maxConns) {
|
||||||
ArrayBlockingQueue<Connection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
|
changeMaxConns(newMaxconns);
|
||||||
ArrayBlockingQueue<Connection> oldQueue = this.queue;
|
|
||||||
Semaphore oldSemaphore = this.canNewSemaphore;
|
|
||||||
this.queue = newQueue;
|
|
||||||
this.maxConns = newMaxconns;
|
|
||||||
this.canNewSemaphore = new Semaphore(this.maxConns);
|
|
||||||
Connection c;
|
|
||||||
while ((c = oldQueue.poll()) != null) {
|
|
||||||
try {
|
|
||||||
if (c.getClientInfo() != null) {
|
|
||||||
c.getClientInfo().put("version", "-1");
|
|
||||||
}
|
|
||||||
} catch (SQLException e) {
|
|
||||||
}
|
|
||||||
offerConnection(c, oldSemaphore);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void resetMaxConnection() {
|
private void resetMaxConnection() {
|
||||||
if ("mysql".equals(dbtype()) || "postgresql".equals(dbtype())) {
|
if ("mysql".equals(dbtype()) || "postgresql".equals(dbtype())) {
|
||||||
int newMaxconns = this.maxConns;
|
int newMaxconns = this.maxConns;
|
||||||
Connection conn = pollConnection();
|
|
||||||
try {
|
try {
|
||||||
|
Connection conn = driver.connect(url, connectAttrs);
|
||||||
Statement stmt = conn.createStatement();
|
Statement stmt = conn.createStatement();
|
||||||
if ("mysql".equals(dbtype())) {
|
if ("mysql".equals(dbtype())) {
|
||||||
ResultSet rs = stmt.executeQuery("SHOW VARIABLES LIKE 'max_connections'");
|
ResultSet rs = stmt.executeQuery("SHOW VARIABLES LIKE 'max_connections'");
|
||||||
@@ -2823,31 +2808,34 @@ public class DataJdbcSource extends AbstractDataSqlSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
stmt.close();
|
stmt.close();
|
||||||
} catch (SQLException e) {
|
conn.close();
|
||||||
} finally {
|
} catch (Exception e) {
|
||||||
offerConnection(conn);
|
|
||||||
}
|
}
|
||||||
if (this.maxConns > newMaxconns) { //配置连接数过大
|
if (this.maxConns > newMaxconns) { //配置连接数过大
|
||||||
ArrayBlockingQueue<Connection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
|
changeMaxConns(newMaxconns);
|
||||||
ArrayBlockingQueue<Connection> oldQueue = this.queue;
|
|
||||||
Semaphore oldSemaphore = this.canNewSemaphore;
|
|
||||||
this.queue = newQueue;
|
|
||||||
this.maxConns = newMaxconns;
|
|
||||||
this.canNewSemaphore = new Semaphore(this.maxConns);
|
|
||||||
Connection c;
|
|
||||||
while ((c = oldQueue.poll()) != null) {
|
|
||||||
try {
|
|
||||||
if (c.getClientInfo() != null) {
|
|
||||||
c.getClientInfo().put("version", "-1");
|
|
||||||
}
|
|
||||||
} catch (SQLException e) {
|
|
||||||
}
|
|
||||||
offerConnection(c, oldSemaphore);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void changeMaxConns(int newMaxconns) {
|
||||||
|
ArrayBlockingQueue<Connection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
|
||||||
|
ArrayBlockingQueue<Connection> oldQueue = this.queue;
|
||||||
|
Semaphore oldSemaphore = this.canNewSemaphore;
|
||||||
|
this.queue = newQueue;
|
||||||
|
this.maxConns = newMaxconns;
|
||||||
|
this.canNewSemaphore = new Semaphore(this.maxConns);
|
||||||
|
Connection c;
|
||||||
|
while ((c = oldQueue.poll()) != null) {
|
||||||
|
try {
|
||||||
|
if (c.getClientInfo() != null) {
|
||||||
|
c.getClientInfo().put("version", "-1");
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
}
|
||||||
|
offerConnection(c, oldSemaphore);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Connection pollConnection() {
|
public Connection pollConnection() {
|
||||||
Connection conn = queue.poll();
|
Connection conn = queue.poll();
|
||||||
if (conn == null) {
|
if (conn == null) {
|
||||||
|
|||||||
Reference in New Issue
Block a user