This commit is contained in:
@@ -542,7 +542,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
|
||||
join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
|
||||
join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
|
||||
}
|
||||
String sql = "DELETE " + (this.readPool.isMysql() ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1))
|
||||
String sql = "DELETE " + ("mysql".equals(this.readPool.getDbtype()) ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1))
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper)
|
||||
+ ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
|
||||
|
||||
@@ -33,6 +33,12 @@ public final class DataSources {
|
||||
|
||||
public static final String JDBC_TABLECOPY_SQLTEMPLATE = "javax.persistence.tablecopy.sqltemplate";
|
||||
|
||||
public static final String JDBC_CONNECTTIMEOUT_SECONDS = "javax.persistence.connecttimeout";
|
||||
|
||||
public static final String JDBC_READTIMEOUT_SECONDS = "javax.persistence.readtimeout";
|
||||
|
||||
public static final String JDBC_WRITETIMEOUT_SECONDS = "javax.persistence.writetimeout";
|
||||
|
||||
public static final String JDBC_URL = "javax.persistence.jdbc.url";
|
||||
|
||||
public static final String JDBC_USER = "javax.persistence.jdbc.user";
|
||||
|
||||
@@ -56,20 +56,6 @@ public class PoolJdbcSource extends PoolSource<Connection> {
|
||||
dataSource.logger.log(Level.WARNING, "connectionErronOccurred [" + event.getSQLException().getSQLState() + "]", event.getSQLException());
|
||||
}
|
||||
};
|
||||
if (this.isOracle()) {
|
||||
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) > 0");
|
||||
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) = 0");
|
||||
if (!this.props.containsKey(JDBC_TABLENOTEXIST_SQLSTATES)) {
|
||||
this.props.setProperty(JDBC_TABLENOTEXIST_SQLSTATES, "42000;42S02");
|
||||
}
|
||||
if (!this.props.containsKey(JDBC_TABLECOPY_SQLTEMPLATE)) {
|
||||
//注意:此语句复制表结构会导致默认值和主键信息的丢失
|
||||
this.props.setProperty(JDBC_TABLECOPY_SQLTEMPLATE, "CREATE TABLE ${newtable} AS SELECT * FROM ${oldtable} WHERE 1=2");
|
||||
}
|
||||
} else if (this.isSqlserver()) {
|
||||
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) > 0");
|
||||
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) = 0");
|
||||
}
|
||||
|
||||
try {
|
||||
this.watch();
|
||||
@@ -149,19 +135,24 @@ public class PoolJdbcSource extends PoolSource<Connection> {
|
||||
return (ConnectionPoolDataSource) pdsource;
|
||||
}
|
||||
|
||||
final boolean isMysql() {
|
||||
@Override
|
||||
protected int getDefaultPort() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
final boolean isMysql2() {
|
||||
return source != null && source.getClass().getName().contains(".mysql.");
|
||||
}
|
||||
|
||||
final boolean isOracle() {
|
||||
final boolean isOracle2() {
|
||||
return source != null && source.getClass().getName().contains("oracle.");
|
||||
}
|
||||
|
||||
final boolean isSqlserver() {
|
||||
final boolean isSqlserver2() {
|
||||
return source != null && source.getClass().getName().contains(".sqlserver.");
|
||||
}
|
||||
|
||||
final boolean isPostgresql() {
|
||||
final boolean isPostgresql2() {
|
||||
return source != null && source.getClass().getName().contains(".postgresql.");
|
||||
}
|
||||
|
||||
@@ -200,7 +191,7 @@ public class PoolJdbcSource extends PoolSource<Connection> {
|
||||
if (pool == null) continue;
|
||||
try {
|
||||
Properties property = m.get(pool.dataSource.name);
|
||||
if (property == null) property = m.get(pool.dataSource.name + "." + pool.stype);
|
||||
if (property == null) property = m.get(pool.dataSource.name + "." + pool.rwtype);
|
||||
if (property != null) pool.change(property);
|
||||
} catch (Exception ex) {
|
||||
dataSource.logger.log(Level.INFO, event.context() + " occur error", ex);
|
||||
@@ -245,7 +236,7 @@ public class PoolJdbcSource extends PoolSource<Connection> {
|
||||
this.url = newurl;
|
||||
this.user = newuser;
|
||||
this.password = newpassword;
|
||||
dataSource.logger.log(Level.INFO, DataSource.class.getSimpleName() + "(" + dataSource.name + "." + stype + ") change (" + property + ")");
|
||||
dataSource.logger.log(Level.INFO, DataSource.class.getSimpleName() + "(" + dataSource.name + "." + rwtype + ") change (" + property + ")");
|
||||
} catch (Exception e) {
|
||||
dataSource.logger.log(Level.SEVERE, DataSource.class.getSimpleName() + " dynamic change JDBC (url userName password) error", e);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
*/
|
||||
package org.redkale.source;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@@ -32,12 +33,22 @@ public abstract class PoolSource<T> {
|
||||
|
||||
protected final Logger logger;
|
||||
|
||||
protected final String stype; // "" 或 "read" 或 "write"
|
||||
protected final String rwtype; // "" 或 "read" 或 "write"
|
||||
|
||||
protected final int maxconns;
|
||||
|
||||
protected final String dbtype;
|
||||
|
||||
protected int connectTimeoutSeconds;
|
||||
|
||||
protected int readTimeoutSeconds;
|
||||
|
||||
protected int writeTimeoutSeconds;
|
||||
|
||||
protected String url;
|
||||
|
||||
protected InetSocketAddress addr;
|
||||
|
||||
protected String user;
|
||||
|
||||
protected String password;
|
||||
@@ -46,16 +57,65 @@ public abstract class PoolSource<T> {
|
||||
|
||||
protected Properties props;
|
||||
|
||||
@SuppressWarnings("OverridableMethodCallInConstructor")
|
||||
public PoolSource(String stype, Properties prop, Logger logger) {
|
||||
this.logger = logger;
|
||||
this.stype = stype;
|
||||
this.rwtype = stype;
|
||||
this.props = prop;
|
||||
this.url = prop.getProperty(JDBC_URL);
|
||||
this.user = prop.getProperty(JDBC_USER);
|
||||
this.password = prop.getProperty(JDBC_PWD);
|
||||
this.connectTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_CONNECTTIMEOUT_SECONDS, "6"));
|
||||
this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "6"));
|
||||
this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "6"));
|
||||
this.maxconns = Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
|
||||
String dbtype0 = "";
|
||||
{ //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串
|
||||
int pos = this.url.indexOf("://");
|
||||
if (pos > 0) {
|
||||
String url0 = this.url.substring(0, pos);
|
||||
pos = url0.lastIndexOf(':');
|
||||
if (pos > 0) dbtype0 = url0.substring(pos + 1);
|
||||
}
|
||||
}
|
||||
this.dbtype = dbtype0.toLowerCase();
|
||||
parseAddressAndDbname();
|
||||
|
||||
if ("oracle".equals(this.dbtype)) {
|
||||
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) > 0");
|
||||
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) = 0");
|
||||
if (!this.props.containsKey(JDBC_TABLENOTEXIST_SQLSTATES)) {
|
||||
this.props.setProperty(JDBC_TABLENOTEXIST_SQLSTATES, "42000;42S02");
|
||||
}
|
||||
if (!this.props.containsKey(JDBC_TABLECOPY_SQLTEMPLATE)) {
|
||||
//注意:此语句复制表结构会导致默认值和主键信息的丢失
|
||||
this.props.setProperty(JDBC_TABLECOPY_SQLTEMPLATE, "CREATE TABLE ${newtable} AS SELECT * FROM ${oldtable} WHERE 1=2");
|
||||
}
|
||||
} else if ("sqlserver".equals(this.dbtype)) {
|
||||
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) > 0");
|
||||
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) = 0");
|
||||
}
|
||||
}
|
||||
|
||||
protected void parseAddressAndDbname() {
|
||||
String url0 = this.url.substring(this.url.indexOf("://") + 3);
|
||||
int pos = url0.indexOf('?'); //127.0.0.1:5432/db?charset=utr8&xxx=yy
|
||||
if (pos > 0) url0 = url0.substring(0, pos);
|
||||
pos = url0.indexOf('/'); //127.0.0.1:5432/db
|
||||
if (pos > 0) {
|
||||
this.defdb = url0.substring(pos + 1);
|
||||
url0 = url0.substring(0, pos);
|
||||
}
|
||||
pos = url0.indexOf(':');
|
||||
if (pos > 0) {
|
||||
this.addr = new InetSocketAddress(url0.substring(0, pos), Integer.parseInt(url0.substring(pos + 1)));
|
||||
} else {
|
||||
this.addr = new InetSocketAddress(url0, getDefaultPort());
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract int getDefaultPort();
|
||||
|
||||
/**
|
||||
* 是否异步, 为true则只能调用pollAsync方法,为false则只能调用poll方法
|
||||
*
|
||||
@@ -71,6 +131,10 @@ public abstract class PoolSource<T> {
|
||||
|
||||
public abstract void close();
|
||||
|
||||
public final String getDbtype() {
|
||||
return dbtype;
|
||||
}
|
||||
|
||||
public final long getUsingCount() {
|
||||
return usingCounter.longValue();
|
||||
}
|
||||
|
||||
57
src/org/redkale/source/PoolTcpSource.java
Normal file
57
src/org/redkale/source/PoolTcpSource.java
Normal file
@@ -0,0 +1,57 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.source;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousChannelGroup;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.logging.Logger;
|
||||
import org.redkale.net.AsyncConnection;
|
||||
import org.redkale.util.ObjectPool;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
||||
|
||||
//ByteBuffer池
|
||||
protected ObjectPool<ByteBuffer> bufferPool;
|
||||
|
||||
//线程池
|
||||
protected ThreadPoolExecutor executor;
|
||||
|
||||
//TCP Channel组
|
||||
protected AsynchronousChannelGroup group;
|
||||
|
||||
public PoolTcpSource(String stype, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool,ThreadPoolExecutor executor) {
|
||||
super(stype, prop, logger);
|
||||
this.bufferPool = bufferPool;
|
||||
this.executor = executor;
|
||||
try {
|
||||
this.group = AsynchronousChannelGroup.withThreadPool(executor);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isAysnc() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final AsyncConnection poll() {
|
||||
return pollAsync().join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<AsyncConnection> pollAsync() {
|
||||
return AsyncConnection.createTCP(group, this.addr, this.readTimeoutSeconds, this.writeTimeoutSeconds);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user