diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 19f22feeb..fd3e21684 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -372,7 +372,7 @@ public final class Application { in.close(); } } catch (Exception e) { - //do nothing + //do nothing } } if (cacheClasses == null) { @@ -1348,6 +1348,9 @@ public final class Application { if (source instanceof DataSqlSource) { resourceFactory.register(sourceName, DataSqlSource.class, source); } + if (source instanceof DataJdbcSource) { + resourceFactory.register(sourceName, DataJdbcSource.class, source); + } } } return source; @@ -1366,6 +1369,9 @@ public final class Application { if (source instanceof DataSqlSource) { resourceFactory.register(sourceName, DataSqlSource.class, source); } + if (source instanceof DataJdbcSource) { + resourceFactory.register(sourceName, DataJdbcSource.class, source); + } } logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source); return source; diff --git a/src/main/java/org/redkale/source/DataJdbcConnection.java b/src/main/java/org/redkale/source/DataJdbcConnection.java new file mode 100644 index 000000000..d7f2d5060 --- /dev/null +++ b/src/main/java/org/redkale/source/DataJdbcConnection.java @@ -0,0 +1,26 @@ +/* + * + */ +package org.redkale.source; + +import java.sql.Connection; + +/** + * 用于获取jdbc的物理连接对象 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + */ +public abstract class DataJdbcConnection { + + final boolean readFlag; + + public abstract Connection getConnection(); + + DataJdbcConnection(boolean readFlag) { + this.readFlag = readFlag; + } +} diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 57818b554..3175e387b 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -46,11 +46,11 @@ public class DataJdbcSource extends AbstractDataSqlSource { @Override public void init(AnyValue conf) { super.init(conf); - this.readPool = new ConnectionPool(readConfProps); + this.readPool = new ConnectionPool(true, readConfProps); if (readConfProps == writeConfProps) { this.writePool = readPool; } else { - this.writePool = new ConnectionPool(writeConfProps); + this.writePool = new ConnectionPool(false, writeConfProps); } } @@ -121,6 +121,22 @@ public class DataJdbcSource extends AbstractDataSqlSource { return writePool; } + @Local + public DataJdbcConnection getReadJdbcConnection() { + return readPool().pollConnection(); + } + + @Local + public DataJdbcConnection getWriteJdbcConnection() { + return writePool().pollConnection(); + } + + @Local + public void offerJdbcConnection(DataJdbcConnection conn) { + ConnectionPool pool = conn.readFlag ? readPool() : writePool(); + pool.offerConnection(conn); + } + @Override protected final String prepareParamSign(int index) { return "?"; @@ -2913,7 +2929,10 @@ public class DataJdbcSource extends AbstractDataSqlSource { protected final AtomicInteger urlVersion = new AtomicInteger(); - public ConnectionPool(Properties prop) { + protected final boolean readFlag; + + public ConnectionPool(boolean readFlag, Properties prop) { + this.readFlag = readFlag; this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "30")); int defMaxConns = Utility.cpus() * 4; if (workExecutor instanceof ThreadPoolExecutor) { @@ -3055,7 +3074,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { SourceConnection conn = null; if (semaphore.tryAcquire()) { try { - conn = new SourceConnection(driver.connect(url, connectAttrs), this.urlVersion.get()); + conn = new SourceConnection(driver.connect(url, connectAttrs), readFlag, this.urlVersion.get()); } catch (SQLException ex) { semaphore.release(); throw new SourceException(ex); @@ -3126,7 +3145,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } - protected class SourceConnection { + protected class SourceConnection extends DataJdbcConnection { public int version; @@ -3134,12 +3153,18 @@ public class DataJdbcSource extends AbstractDataSqlSource { boolean commiting; - public SourceConnection(Connection conn, int version) { + public SourceConnection(Connection conn, boolean readFlag, int version) { + super(readFlag); Objects.requireNonNull(conn); this.conn = conn; this.version = version; } + @Override + public Connection getConnection() { + return conn; + } + public Statement createStreamStatement() throws SQLException { Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); statement.setFetchSize(Integer.MIN_VALUE);