切换DataJdbcSource

This commit is contained in:
Redkale
2018-05-10 19:39:47 +08:00
parent db462ec134
commit 5aa10e9d74
3 changed files with 2754 additions and 2754 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,512 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.source;
import java.io.Serializable;
import java.net.URL;
import java.sql.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.*;
import java.util.logging.Level;
import org.redkale.service.Local;
import org.redkale.util.*;
/**
* DataSource的JDBC实现类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@Local
@AutoLoad(false)
@SuppressWarnings("unchecked")
@ResourceType(DataSource.class)
public class DataSqlJdbcSource extends DataSqlSource<Connection> {
public DataSqlJdbcSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
super(unitName, persistxml, readprop, writeprop);
}
@Override
protected final String prepareParamSign(int index) {
return "?";
}
@Override
protected final boolean isAsync() {
return false;
}
@Override
protected PoolSource<Connection> createPoolSource(DataSource source, String rwtype, Properties prop) {
return new PoolJdbcSource(this.name, this.persistxml, rwtype, prop, this.logger);
}
@Override
protected <T> CompletableFuture<Integer> insertDB(EntityInfo<T> info, T... values) {
Connection conn = null;
try {
int c = 0;
conn = writePool.poll();
final String sql = info.getInsertPrepareSQL(values[0]);
final Class primaryType = info.getPrimary().type();
final Attribute primary = info.getPrimary();
Attribute<T, Serializable>[] attrs = info.insertAttributes;
conn.setReadOnly(false);
conn.setAutoCommit(true);
PreparedStatement prestmt = createInsertPreparedStatement(conn, sql, info, values);
try {
prestmt.executeBatch();
} catch (SQLException se) {
if (info.tableStrategy == null || !info.tablenotexistSqlstates.contains(';' + se.getSQLState() + ';')) throw se;
synchronized (info.tables) {
final String oldTable = info.table;
final String newTable = info.getTable(values[0]);
if (!info.tables.contains(newTable)) {
try {
Statement st = conn.createStatement();
st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable));
st.close();
info.tables.add(newTable);
} catch (SQLException sqle) { //多进程并发时可能会出现重复建表
if (newTable.indexOf('.') > 0 && info.tablenotexistSqlstates.contains(';' + se.getSQLState() + ';')) {
Statement st;
try {
st = conn.createStatement();
st.execute("CREATE DATABASE " + newTable.substring(0, newTable.indexOf('.')));
st.close();
} catch (SQLException sqle1) {
logger.log(Level.SEVERE, "create database(" + newTable.substring(0, newTable.indexOf('.')) + ") error", sqle1);
}
try {
st = conn.createStatement();
st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable));
st.close();
info.tables.add(newTable);
} catch (SQLException sqle2) {
logger.log(Level.SEVERE, "create table2(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle2);
}
} else {
logger.log(Level.SEVERE, "create table(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle);
}
}
}
}
prestmt.close();
prestmt = createInsertPreparedStatement(conn, sql, info, values);
int[] cs = prestmt.executeBatch();
int c1 = 0;
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
if (info.autoGenerated) { //由数据库自动生成主键值
ResultSet set = prestmt.getGeneratedKeys();
int i = -1;
while (set.next()) {
if (primaryType == int.class) {
primary.set(values[++i], set.getInt(1));
} else if (primaryType == long.class) {
primary.set(values[++i], set.getLong(1));
} else {
primary.set(values[++i], set.getObject(1));
}
}
set.close();
}
prestmt.close();
//------------------------------------------------------------
if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息
char[] sqlchars = sql.toCharArray();
for (final T value : values) {
//-----------------------------
StringBuilder sb = new StringBuilder(128);
int i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = attrs[i++].get(value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(FilterNode.formatToString(obj));
}
} else {
sb.append(ch);
}
}
logger.finest(info.getType().getSimpleName() + " insert sql=" + sb.toString().replaceAll("(\r|\n)", "\\n"));
}
} //打印结束
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
protected <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
final EntityInfo<T> info, T... values) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final PreparedStatement prestmt = info.autoGenerated ? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
for (final T value : values) {
int i = 0;
if (info.autouuid) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
Serializable val = attr.get(value);
if (val instanceof byte[]) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++i, blob);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++i, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++i, ((AtomicLong) val).get());
} else {
prestmt.setObject(++i, val);
}
}
prestmt.addBatch();
}
return prestmt;
}
@Override
protected <T> CompletableFuture<Integer> deleteDB(EntityInfo<T> info, Flipper flipper, String sql) {
Connection conn = null;
try {
conn = writePool.poll();
conn.setReadOnly(false);
conn.setAutoCommit(true);
sql += ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
final Statement stmt = conn.createStatement();
int c = stmt.executeUpdate(sql);
stmt.close();
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, T... values) {
Connection conn = null;
try {
conn = writePool.poll();
conn.setReadOnly(false);
conn.setAutoCommit(true);
final String updateSQL = info.getUpdatePrepareSQL(values[0]);
final PreparedStatement prestmt = conn.prepareStatement(updateSQL);
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final boolean debugfinest = info.isLoggable(logger, Level.FINEST);
char[] sqlchars = debugfinest ? updateSQL.toCharArray() : null;
final Attribute<T, Serializable> primary = info.getPrimary();
for (final T value : values) {
int k = 0;
for (Attribute<T, Serializable> attr : attrs) {
Serializable val = attr.get(value);
if (val instanceof byte[]) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++k, blob);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++k, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
prestmt.setObject(++k, ((AtomicLong) val).get());
} else {
prestmt.setObject(++k, val);
}
}
prestmt.setObject(++k, primary.get(value));
prestmt.addBatch();//------------------------------------------------------------
if (debugfinest) { //打印调试信息
//-----------------------------
int i = 0;
StringBuilder sb = new StringBuilder(128);
for (char ch : sqlchars) {
if (ch == '?') {
Object obj = i == attrs.length ? primary.get(value) : attrs[i++].get(value);
if (obj != null && obj.getClass().isArray()) {
sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
} else {
sb.append(FilterNode.formatToString(obj));
}
} else {
sb.append(ch);
}
}
logger.finest(info.getType().getSimpleName() + " update sql=" + sb.toString().replaceAll("(\r|\n)", "\\n"));
} //打印结束
}
int[] pc = prestmt.executeBatch();
int c = 0;
for (int p : pc) {
if (p >= 0) c += p;
}
prestmt.close();
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, Flipper flipper, String sql, boolean prepared, Object... params) {
Connection conn = null;
try {
conn = writePool.poll();
conn.setReadOnly(false);
conn.setAutoCommit(true);
if (prepared) {
final PreparedStatement prestmt = conn.prepareStatement(sql);
int index = 0;
for (Object param : params) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) param);
prestmt.setBlob(++index, blob);
}
int c = prestmt.executeUpdate();
prestmt.close();
return CompletableFuture.completedFuture(c);
} else {
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
final Statement stmt = conn.createStatement();
int c = stmt.executeUpdate(sql);
stmt.close();
return CompletableFuture.completedFuture(c);
}
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) writePool.offerConnection(conn);
}
}
@Override
protected <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(EntityInfo<T> info, String sql, FilterFuncColumn... columns) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final Statement stmt = conn.createStatement();
ResultSet set = stmt.executeQuery(sql);
final Map map = new HashMap<>();
if (set.next()) {
int index = 0;
for (FilterFuncColumn ffc : columns) {
for (String col : ffc.cols()) {
Object o = set.getObject(++index);
Number rs = ffc.defvalue;
if (o != null) rs = (Number) o;
map.put(ffc.col(col), rs);
}
}
}
set.close();
stmt.close();
return CompletableFuture.completedFuture(map);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) readPool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Number> getNumberResultDB(EntityInfo<T> info, String sql, Number defVal, String column) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final Statement stmt = conn.createStatement();
Number rs = defVal;
ResultSet set = stmt.executeQuery(sql);
if (set.next()) {
Object o = set.getObject(1);
if (o != null) rs = (Number) o;
}
set.close();
stmt.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) readPool.offerConnection(conn);
}
}
@Override
protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(EntityInfo<T> info, String sql, String keyColumn) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final Statement stmt = conn.createStatement();
Map<K, N> rs = new LinkedHashMap<>();
ResultSet set = stmt.executeQuery(sql);
ResultSetMetaData rsd = set.getMetaData();
boolean smallint = rsd.getColumnType(1) == Types.SMALLINT;
while (set.next()) {
rs.put((K) (smallint ? set.getShort(1) : set.getObject(1)), (N) set.getObject(2));
}
set.close();
stmt.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) readPool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<T> findDB(EntityInfo<T> info, String sql, boolean onlypk, SelectColumn selects) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(1);
final ResultSet set = ps.executeQuery();
T rs = set.next() ? info.getValue(selects, set) : null;
set.close();
ps.close();
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
if (info.tableStrategy != null && info.tablenotexistSqlstates.contains(';' + e.getSQLState() + ';')) return CompletableFuture.completedFuture(null);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) readPool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Serializable> findColumnDB(EntityInfo<T> info, String sql, boolean onlypk, String column, Serializable defValue) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final Attribute<T, Serializable> attr = info.getAttribute(column);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(1);
final ResultSet set = ps.executeQuery();
Serializable val = defValue;
if (set.next()) {
if (attr.type() == byte[].class) {
Blob blob = set.getBlob(1);
if (blob != null) val = blob.getBytes(1, (int) blob.length());
} else {
val = (Serializable) set.getObject(1);
}
}
set.close();
ps.close();
return CompletableFuture.completedFuture(val == null ? defValue : val);
} catch (SQLException e) {
if (info.tableStrategy != null && info.tablenotexistSqlstates.contains(';' + e.getSQLState() + ';')) return CompletableFuture.completedFuture(defValue);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) readPool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Boolean> existsDB(EntityInfo<T> info, String sql, boolean onlypk) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
final ResultSet set = ps.executeQuery();
boolean rs = set.next() ? (set.getInt(1) > 0) : false;
set.close();
ps.close();
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql);
return CompletableFuture.completedFuture(rs);
} catch (SQLException e) {
if (info.tableStrategy != null && info.tablenotexistSqlstates.contains(';' + e.getSQLState() + ';')) return CompletableFuture.completedFuture(false);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) readPool.offerConnection(conn);
}
}
@Override
protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, boolean needtotal, SelectColumn selects, Flipper flipper, FilterNode node) {
Connection conn = null;
try {
conn = readPool.poll();
conn.setReadOnly(true);
final SelectColumn sels = selects;
final List<T> list = new ArrayList();
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
final String sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + info.getTable(node) + " a" + (join == null ? "" : join)
+ ((where == null || where.length() == 0) ? "" : (" WHERE " + where)) + info.createSQLOrderby(flipper);
if (info.isLoggable(logger, Level.FINEST)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + sql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getOffset() + "," + flipper.getLimit())));
}
conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
if (flipper != null && flipper.getLimit() > 0) ps.setFetchSize(flipper.getLimit());
final ResultSet set = ps.executeQuery();
if (flipper != null && flipper.getOffset() > 0) set.absolute(flipper.getOffset());
final int limit = flipper == null || flipper.getLimit() < 1 ? Integer.MAX_VALUE : flipper.getLimit();
int i = 0;
while (set.next()) {
i++;
list.add(info.getValue(sels, set));
if (limit <= i) break;
}
long total = list.size();
if (needtotal && flipper != null) {
set.last();
total = set.getRow();
}
set.close();
ps.close();
return CompletableFuture.completedFuture(new Sheet<>(total, list));
} catch (SQLException e) {
if (info.tableStrategy != null && info.tablenotexistSqlstates.contains(';' + e.getSQLState() + ';')) return CompletableFuture.completedFuture(new Sheet<>());
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
} finally {
if (conn != null) readPool.offerConnection(conn);
}
}
}