From 60bdf4d1378bef954647ce6a0a8c0281a5ed3e3b Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 14 Apr 2018 11:22:32 +0800 Subject: [PATCH] --- src/org/redkale/source/DataJdbcSource.java | 2 +- src/org/redkale/source/DataSources.java | 6 ++ src/org/redkale/source/PoolJdbcSource.java | 31 ++++------ src/org/redkale/source/PoolSource.java | 68 +++++++++++++++++++++- src/org/redkale/source/PoolTcpSource.java | 57 ++++++++++++++++++ 5 files changed, 141 insertions(+), 23 deletions(-) create mode 100644 src/org/redkale/source/PoolTcpSource.java diff --git a/src/org/redkale/source/DataJdbcSource.java b/src/org/redkale/source/DataJdbcSource.java index bc419e668..75b244adc 100644 --- a/src/org/redkale/source/DataJdbcSource.java +++ b/src/org/redkale/source/DataJdbcSource.java @@ -542,7 +542,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0); join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0); } - String sql = "DELETE " + (this.readPool.isMysql() ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1)) + String sql = "DELETE " + ("mysql".equals(this.readPool.getDbtype()) ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1)) + ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2)) : (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper) + ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit())); diff --git a/src/org/redkale/source/DataSources.java b/src/org/redkale/source/DataSources.java index 16d6f1d6a..08782bb10 100644 --- a/src/org/redkale/source/DataSources.java +++ b/src/org/redkale/source/DataSources.java @@ -33,6 +33,12 @@ public final class DataSources { public static final String JDBC_TABLECOPY_SQLTEMPLATE = "javax.persistence.tablecopy.sqltemplate"; + public static final String JDBC_CONNECTTIMEOUT_SECONDS = "javax.persistence.connecttimeout"; + + public static final String JDBC_READTIMEOUT_SECONDS = "javax.persistence.readtimeout"; + + public static final String JDBC_WRITETIMEOUT_SECONDS = "javax.persistence.writetimeout"; + public static final String JDBC_URL = "javax.persistence.jdbc.url"; public static final String JDBC_USER = "javax.persistence.jdbc.user"; diff --git a/src/org/redkale/source/PoolJdbcSource.java b/src/org/redkale/source/PoolJdbcSource.java index 056544f7c..e61202449 100644 --- a/src/org/redkale/source/PoolJdbcSource.java +++ b/src/org/redkale/source/PoolJdbcSource.java @@ -56,20 +56,6 @@ public class PoolJdbcSource extends PoolSource { dataSource.logger.log(Level.WARNING, "connectionErronOccurred [" + event.getSQLException().getSQLState() + "]", event.getSQLException()); } }; - if (this.isOracle()) { - this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) > 0"); - this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) = 0"); - if (!this.props.containsKey(JDBC_TABLENOTEXIST_SQLSTATES)) { - this.props.setProperty(JDBC_TABLENOTEXIST_SQLSTATES, "42000;42S02"); - } - if (!this.props.containsKey(JDBC_TABLECOPY_SQLTEMPLATE)) { - //注意:此语句复制表结构会导致默认值和主键信息的丢失 - this.props.setProperty(JDBC_TABLECOPY_SQLTEMPLATE, "CREATE TABLE ${newtable} AS SELECT * FROM ${oldtable} WHERE 1=2"); - } - } else if (this.isSqlserver()) { - this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) > 0"); - this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) = 0"); - } try { this.watch(); @@ -149,19 +135,24 @@ public class PoolJdbcSource extends PoolSource { return (ConnectionPoolDataSource) pdsource; } - final boolean isMysql() { + @Override + protected int getDefaultPort() { + return 0; + } + + final boolean isMysql2() { return source != null && source.getClass().getName().contains(".mysql."); } - final boolean isOracle() { + final boolean isOracle2() { return source != null && source.getClass().getName().contains("oracle."); } - final boolean isSqlserver() { + final boolean isSqlserver2() { return source != null && source.getClass().getName().contains(".sqlserver."); } - final boolean isPostgresql() { + final boolean isPostgresql2() { return source != null && source.getClass().getName().contains(".postgresql."); } @@ -200,7 +191,7 @@ public class PoolJdbcSource extends PoolSource { if (pool == null) continue; try { Properties property = m.get(pool.dataSource.name); - if (property == null) property = m.get(pool.dataSource.name + "." + pool.stype); + if (property == null) property = m.get(pool.dataSource.name + "." + pool.rwtype); if (property != null) pool.change(property); } catch (Exception ex) { dataSource.logger.log(Level.INFO, event.context() + " occur error", ex); @@ -245,7 +236,7 @@ public class PoolJdbcSource extends PoolSource { this.url = newurl; this.user = newuser; this.password = newpassword; - dataSource.logger.log(Level.INFO, DataSource.class.getSimpleName() + "(" + dataSource.name + "." + stype + ") change (" + property + ")"); + dataSource.logger.log(Level.INFO, DataSource.class.getSimpleName() + "(" + dataSource.name + "." + rwtype + ") change (" + property + ")"); } catch (Exception e) { dataSource.logger.log(Level.SEVERE, DataSource.class.getSimpleName() + " dynamic change JDBC (url userName password) error", e); } diff --git a/src/org/redkale/source/PoolSource.java b/src/org/redkale/source/PoolSource.java index 8272a984f..39ec03d2b 100644 --- a/src/org/redkale/source/PoolSource.java +++ b/src/org/redkale/source/PoolSource.java @@ -5,6 +5,7 @@ */ package org.redkale.source; +import java.net.InetSocketAddress; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -32,12 +33,22 @@ public abstract class PoolSource { protected final Logger logger; - protected final String stype; // "" 或 "read" 或 "write" + protected final String rwtype; // "" 或 "read" 或 "write" protected final int maxconns; + protected final String dbtype; + + protected int connectTimeoutSeconds; + + protected int readTimeoutSeconds; + + protected int writeTimeoutSeconds; + protected String url; + protected InetSocketAddress addr; + protected String user; protected String password; @@ -46,16 +57,65 @@ public abstract class PoolSource { protected Properties props; + @SuppressWarnings("OverridableMethodCallInConstructor") public PoolSource(String stype, Properties prop, Logger logger) { this.logger = logger; - this.stype = stype; + this.rwtype = stype; this.props = prop; this.url = prop.getProperty(JDBC_URL); this.user = prop.getProperty(JDBC_USER); this.password = prop.getProperty(JDBC_PWD); + this.connectTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_CONNECTTIMEOUT_SECONDS, "6")); + this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "6")); + this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "6")); this.maxconns = Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)); + String dbtype0 = ""; + { //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串 + int pos = this.url.indexOf("://"); + if (pos > 0) { + String url0 = this.url.substring(0, pos); + pos = url0.lastIndexOf(':'); + if (pos > 0) dbtype0 = url0.substring(pos + 1); + } + } + this.dbtype = dbtype0.toLowerCase(); + parseAddressAndDbname(); + + if ("oracle".equals(this.dbtype)) { + this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) > 0"); + this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) = 0"); + if (!this.props.containsKey(JDBC_TABLENOTEXIST_SQLSTATES)) { + this.props.setProperty(JDBC_TABLENOTEXIST_SQLSTATES, "42000;42S02"); + } + if (!this.props.containsKey(JDBC_TABLECOPY_SQLTEMPLATE)) { + //注意:此语句复制表结构会导致默认值和主键信息的丢失 + this.props.setProperty(JDBC_TABLECOPY_SQLTEMPLATE, "CREATE TABLE ${newtable} AS SELECT * FROM ${oldtable} WHERE 1=2"); + } + } else if ("sqlserver".equals(this.dbtype)) { + this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) > 0"); + this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) = 0"); + } } + protected void parseAddressAndDbname() { + String url0 = this.url.substring(this.url.indexOf("://") + 3); + int pos = url0.indexOf('?'); //127.0.0.1:5432/db?charset=utr8&xxx=yy + if (pos > 0) url0 = url0.substring(0, pos); + pos = url0.indexOf('/'); //127.0.0.1:5432/db + if (pos > 0) { + this.defdb = url0.substring(pos + 1); + url0 = url0.substring(0, pos); + } + pos = url0.indexOf(':'); + if (pos > 0) { + this.addr = new InetSocketAddress(url0.substring(0, pos), Integer.parseInt(url0.substring(pos + 1))); + } else { + this.addr = new InetSocketAddress(url0, getDefaultPort()); + } + } + + protected abstract int getDefaultPort(); + /** * 是否异步, 为true则只能调用pollAsync方法,为false则只能调用poll方法 * @@ -71,6 +131,10 @@ public abstract class PoolSource { public abstract void close(); + public final String getDbtype() { + return dbtype; + } + public final long getUsingCount() { return usingCounter.longValue(); } diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java new file mode 100644 index 000000000..490e51fab --- /dev/null +++ b/src/org/redkale/source/PoolTcpSource.java @@ -0,0 +1,57 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.source; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousChannelGroup; +import java.util.Properties; +import java.util.concurrent.*; +import java.util.logging.Logger; +import org.redkale.net.AsyncConnection; +import org.redkale.util.ObjectPool; + +/** + * + * @author zhangjx + */ +public abstract class PoolTcpSource extends PoolSource { + + //ByteBuffer池 + protected ObjectPool bufferPool; + + //线程池 + protected ThreadPoolExecutor executor; + + //TCP Channel组 + protected AsynchronousChannelGroup group; + + public PoolTcpSource(String stype, Properties prop, Logger logger, ObjectPool bufferPool,ThreadPoolExecutor executor) { + super(stype, prop, logger); + this.bufferPool = bufferPool; + this.executor = executor; + try { + this.group = AsynchronousChannelGroup.withThreadPool(executor); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public final boolean isAysnc() { + return true; + } + + @Override + public final AsyncConnection poll() { + return pollAsync().join(); + } + + @Override + public CompletableFuture pollAsync() { + return AsyncConnection.createTCP(group, this.addr, this.readTimeoutSeconds, this.writeTimeoutSeconds); + } +}