增加DataJdbcConnection功能

This commit is contained in:
redkale
2023-11-17 23:22:28 +08:00
parent 1ee5d79866
commit c82f454e42
3 changed files with 64 additions and 7 deletions

View File

@@ -1348,6 +1348,9 @@ public final class Application {
if (source instanceof DataSqlSource) { if (source instanceof DataSqlSource) {
resourceFactory.register(sourceName, DataSqlSource.class, source); resourceFactory.register(sourceName, DataSqlSource.class, source);
} }
if (source instanceof DataJdbcSource) {
resourceFactory.register(sourceName, DataJdbcSource.class, source);
}
} }
} }
return source; return source;
@@ -1366,6 +1369,9 @@ public final class Application {
if (source instanceof DataSqlSource) { if (source instanceof DataSqlSource) {
resourceFactory.register(sourceName, DataSqlSource.class, source); resourceFactory.register(sourceName, DataSqlSource.class, source);
} }
if (source instanceof DataJdbcSource) {
resourceFactory.register(sourceName, DataJdbcSource.class, source);
}
} }
logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source); logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source);
return source; return source;

View File

@@ -0,0 +1,26 @@
/*
*
*/
package org.redkale.source;
import java.sql.Connection;
/**
* 用于获取jdbc的物理连接对象
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @since 2.8.0
*/
public abstract class DataJdbcConnection {
final boolean readFlag;
public abstract Connection getConnection();
DataJdbcConnection(boolean readFlag) {
this.readFlag = readFlag;
}
}

View File

@@ -46,11 +46,11 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override @Override
public void init(AnyValue conf) { public void init(AnyValue conf) {
super.init(conf); super.init(conf);
this.readPool = new ConnectionPool(readConfProps); this.readPool = new ConnectionPool(true, readConfProps);
if (readConfProps == writeConfProps) { if (readConfProps == writeConfProps) {
this.writePool = readPool; this.writePool = readPool;
} else { } else {
this.writePool = new ConnectionPool(writeConfProps); this.writePool = new ConnectionPool(false, writeConfProps);
} }
} }
@@ -121,6 +121,22 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return writePool; return writePool;
} }
@Local
public DataJdbcConnection getReadJdbcConnection() {
return readPool().pollConnection();
}
@Local
public DataJdbcConnection getWriteJdbcConnection() {
return writePool().pollConnection();
}
@Local
public void offerJdbcConnection(DataJdbcConnection conn) {
ConnectionPool pool = conn.readFlag ? readPool() : writePool();
pool.offerConnection(conn);
}
@Override @Override
protected final String prepareParamSign(int index) { protected final String prepareParamSign(int index) {
return "?"; return "?";
@@ -2913,7 +2929,10 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected final AtomicInteger urlVersion = new AtomicInteger(); protected final AtomicInteger urlVersion = new AtomicInteger();
public ConnectionPool(Properties prop) { protected final boolean readFlag;
public ConnectionPool(boolean readFlag, Properties prop) {
this.readFlag = readFlag;
this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "30")); this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "30"));
int defMaxConns = Utility.cpus() * 4; int defMaxConns = Utility.cpus() * 4;
if (workExecutor instanceof ThreadPoolExecutor) { if (workExecutor instanceof ThreadPoolExecutor) {
@@ -3055,7 +3074,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
SourceConnection conn = null; SourceConnection conn = null;
if (semaphore.tryAcquire()) { if (semaphore.tryAcquire()) {
try { try {
conn = new SourceConnection(driver.connect(url, connectAttrs), this.urlVersion.get()); conn = new SourceConnection(driver.connect(url, connectAttrs), readFlag, this.urlVersion.get());
} catch (SQLException ex) { } catch (SQLException ex) {
semaphore.release(); semaphore.release();
throw new SourceException(ex); throw new SourceException(ex);
@@ -3126,7 +3145,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
} }
} }
protected class SourceConnection { protected class SourceConnection extends DataJdbcConnection {
public int version; public int version;
@@ -3134,12 +3153,18 @@ public class DataJdbcSource extends AbstractDataSqlSource {
boolean commiting; boolean commiting;
public SourceConnection(Connection conn, int version) { public SourceConnection(Connection conn, boolean readFlag, int version) {
super(readFlag);
Objects.requireNonNull(conn); Objects.requireNonNull(conn);
this.conn = conn; this.conn = conn;
this.version = version; this.version = version;
} }
@Override
public Connection getConnection() {
return conn;
}
public Statement createStreamStatement() throws SQLException { public Statement createStreamStatement() throws SQLException {
Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(Integer.MIN_VALUE); statement.setFetchSize(Integer.MIN_VALUE);