diff --git a/src/org/redkale/source/DataJdbcOldSource.java b/src/org/redkale/source/DataJdbcOldSource.java
deleted file mode 100644
index 746f8dbbd..000000000
--- a/src/org/redkale/source/DataJdbcOldSource.java
+++ /dev/null
@@ -1,2469 +0,0 @@
-/*
- * To change this license header, choose License Headers in Project Properties.
- * To change this template file, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.redkale.source;
-
-import java.io.*;
-import java.net.URL;
-import java.sql.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.function.*;
-import java.util.logging.*;
-import java.util.stream.Stream;
-import javax.annotation.Resource;
-import org.redkale.service.*;
-import static org.redkale.source.DataSources.*;
-import org.redkale.util.*;
-
-/**
- * DataSource的JDBC实现类
- *
- *
- * 详情见: https://redkale.org
- *
- * @author zhangjx
- */
-@Local
-@AutoLoad(false)
-@SuppressWarnings("unchecked")
-@ResourceType(DataSource.class)
-public class DataJdbcOldSource extends AbstractService implements DataSource, DataCacheListener, Function, AutoCloseable, Resourcable {
-
- protected static final Flipper FLIPPER_ONE = new Flipper(1);
-
- protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
-
- protected String name;
-
- protected URL persistxml;
-
- protected int threads;
-
- protected ExecutorService executor;
-
- protected boolean cacheForbidden;
-
- protected PoolJdbcSource readPool;
-
- protected PoolJdbcSource writePool;
-
- @Resource(name = "$")
- protected DataCacheListener cacheListener;
-
- protected final BiConsumer futureCompleteConsumer = (r, t) -> {
- if (t != null) logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t);
- };
-
- protected final BiFunction fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
-
- public DataJdbcOldSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
- this.persistxml = persistxml;
- this.preConstruct(unitName, readprop, writeprop);
- this.initByProperties(unitName, readprop, writeprop);
- }
-
- public DataJdbcOldSource() {
- }
-
- @Override
- public void init(AnyValue config) { //通过空构造函数创建的对象需要调用init方法进行初始化
- String unitName = config.getValue("name");
- Properties readprop = new Properties();
- Properties writeprop = new Properties();
-
- for (AnyValue confs : config.getAnyValues("properties")) {
- boolean write = confs.getValue("name", "").contains("write");
- for (AnyValue conf : confs.getAnyValues("property")) {
- String pn = conf.getValue("name");
- String pv = conf.getValue("value");
- if (pn == null || pv == null) continue;
- (write ? writeprop : readprop).put(pn, pv);
- }
- }
-
- for (AnyValue conf : config.getAnyValues("property")) {
- String pn = conf.getValue("name");
- String pv = conf.getValue("value");
- if (pn == null || pv == null) continue;
- readprop.put(pn, pv);
- }
- if (writeprop.isEmpty()) writeprop = readprop;
- this.initByProperties(unitName, readprop, writeprop);
- }
-
- @Override
- protected ExecutorService getExecutor() {
- return executor;
- }
-
- @Override
- public void destroy(AnyValue config) {
- if (this.executor != null) this.executor.shutdownNow();
- }
-
- //构造前调用
- protected void preConstruct(String unitName, Properties readprop, Properties writeprop) {
- final AtomicInteger counter = new AtomicInteger();
- this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16));
- final String cname = this.getClass().getSimpleName();
- final Thread.UncaughtExceptionHandler ueh = (t, e) -> {
- logger.log(Level.SEVERE, cname + " error", e);
- };
- this.executor = Executors.newFixedThreadPool(threads, (Runnable r) -> {
- Thread t = new Thread(r);
- t.setDaemon(true);
- String s = "" + counter.incrementAndGet();
- if (s.length() == 1) {
- s = "00" + s;
- } else if (s.length() == 2) {
- s = "0" + s;
- }
- t.setName(cname + "-Thread-" + s);
- t.setUncaughtExceptionHandler(ueh);
- return t;
- });
- }
-
- protected void initByProperties(String unitName, Properties readprop, Properties writeprop) {
- this.name = unitName;
- this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
- this.readPool = new PoolJdbcSource(unitName, persistxml, "read", null, readprop, logger);
- this.writePool = new PoolJdbcSource(unitName, persistxml, "write", null, writeprop, logger);
- }
-
- @Local
- @Override
- public String getType() {
- return "jdbc";
- }
-
- @Override
- public final String resourceName() {
- return name;
- }
-
- @Override
- @Local
- public void close() throws Exception {
- readPool.close();
- writePool.close();
- }
-
- @Local
- public PoolJdbcSource getReadPoolJdbcSource() {
- return readPool;
- }
-
- @Local
- public PoolJdbcSource getWritePoolJdbcSource() {
- return writePool;
- }
-
- @Local
- public Connection createReadSQLConnection() {
- return readPool.poll();
- }
-
- @Local
- public Connection createWriteSQLConnection() {
- return writePool.poll();
- }
-
- @Local
- public void closeSQLConnection(final Connection sqlconn) {
- if (sqlconn == null) return;
- try {
- sqlconn.close();
- } catch (Exception e) {
- logger.log(Level.WARNING, "closeSQLConnection abort", e);
- }
- }
-
- @Override
- @Local
- public EntityInfo apply(Class t) {
- return loadEntityInfo(t);
- }
-
- protected EntityInfo loadEntityInfo(Class clazz) {
- return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader);
- }
-
- /**
- * 将entity的对象全部加载到Cache中去,如果clazz没有被@javax.persistence.Cacheable注解则不做任何事
- *
- * @param Entity类泛型
- * @param clazz Entity类
- */
- public void refreshCache(Class clazz) {
- EntityInfo info = loadEntityInfo(clazz);
- EntityCache cache = info.getCache();
- if (cache == null) return;
- cache.fullLoad();
- }
-
- //----------------------insertCache-----------------------------
- /**
- * 新增对象, 必须是Entity对象
- *
- * @param Entity类泛型
- * @param values Entity对象
- *
- * @return 影响的记录条数
- */
- @Override
- public int insert(@RpcCall(DataCallArrayAttribute.class) T... values) {
- if (values.length == 0) return 0;
- if (values.length > 1) { //检查对象是否都是同一个Entity类
- Class clazz = null;
- for (T val : values) {
- if (clazz == null) {
- clazz = val.getClass();
- continue;
- }
- if (clazz != val.getClass()) {
- throw new RuntimeException("DataSource.insert must the same Class Entity, but diff is " + clazz + " and " + val.getClass());
- }
- }
- }
- final EntityInfo info = loadEntityInfo((Class) values[0].getClass());
- if (info.isVirtualEntity()) {
- return insert(null, info, values);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return insert(conn, info, values);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) {
- return CompletableFuture.supplyAsync(() -> insert(values), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int insert(final Connection conn, final EntityInfo info, T... values) {
- if (values.length == 0) return 0;
- int c = -1;
- try {
- if (!info.isVirtualEntity()) {
- final String sql = info.getInsertPrepareSQL(values[0]);
- final Class primaryType = info.getPrimary().type();
- final Attribute primary = info.getPrimary();
- Attribute[] attrs = info.insertAttributes;
- conn.setReadOnly(false);
- PreparedStatement prestmt = createInsertPreparedStatement(conn, sql, info, values);
- try {
- prestmt.executeBatch();
- } catch (SQLException se) {
- if (info.tableStrategy == null || !info.isTableNotExist(se)) throw se;
- synchronized (info.tables) {
- final String oldTable = info.table;
- final String newTable = info.getTable(values[0]);
- if (!info.tables.contains(newTable)) {
- try {
- Statement st = conn.createStatement();
- st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable));
- st.close();
- info.tables.add(newTable);
- } catch (SQLException sqle) { //多进程并发时可能会出现重复建表
- if (newTable.indexOf('.') > 0 && info.isTableNotExist(se)) {
- Statement st;
- try {
- st = conn.createStatement();
- st.execute("CREATE DATABASE " + newTable.substring(0, newTable.indexOf('.')));
- st.close();
- } catch (SQLException sqle1) {
- logger.log(Level.SEVERE, "create database(" + newTable.substring(0, newTable.indexOf('.')) + ") error", sqle1);
- }
- try {
- st = conn.createStatement();
- st.execute(info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable));
- st.close();
- info.tables.add(newTable);
- } catch (SQLException sqle2) {
- logger.log(Level.SEVERE, "create table2(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle2);
- }
- } else {
- logger.log(Level.SEVERE, "create table(" + info.tablecopySQL.replace("${newtable}", newTable).replace("${oldtable}", oldTable) + ") error", sqle);
- }
- }
- }
- }
- prestmt.close();
- prestmt = createInsertPreparedStatement(conn, sql, info, values);
- int[] cs = prestmt.executeBatch();
- int c1 = 0;
- for (int cc : cs) {
- c1 += cc;
- }
- c = c1;
- }
- if (info.autoGenerated) { //由数据库自动生成主键值
- ResultSet set = prestmt.getGeneratedKeys();
- int i = -1;
- while (set.next()) {
- if (primaryType == int.class) {
- primary.set(values[++i], set.getInt(1));
- } else if (primaryType == long.class) {
- primary.set(values[++i], set.getLong(1));
- } else {
- primary.set(values[++i], set.getObject(1));
- }
- }
- set.close();
- }
- prestmt.close();
- //------------------------------------------------------------
- if (info.isLoggable(logger, Level.FINEST)) { //打印调试信息
- char[] sqlchars = sql.toCharArray();
- for (final T value : values) {
- //-----------------------------
- StringBuilder sb = new StringBuilder(128);
- int i = 0;
- for (char ch : sqlchars) {
- if (ch == '?') {
- Object obj = attrs[i++].get(value);
- if (obj != null && obj.getClass().isArray()) {
- sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
- } else {
- sb.append(FilterNode.formatToString(obj));
- }
- } else {
- sb.append(ch);
- }
- }
- logger.finest(info.getType().getSimpleName() + " insert sql=" + sb.toString().replaceAll("(\r|\n)", "\\n"));
- }
- } //打印结束
- }
- final EntityCache cache = info.getCache();
- int c2 = 0;
- if (cache != null) { //更新缓存
- for (final T value : values) {
- c2 += cache.insert(value);
- }
- if (cacheListener != null) cacheListener.insertCache(info.getType(), values);
- }
- return c >= 0 ? c : c2;
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- protected PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
- final EntityInfo info, T... values) throws SQLException {
- Attribute[] attrs = info.insertAttributes;
- final PreparedStatement prestmt = info.autoGenerated ? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
-
- for (final T value : values) {
- int i = 0;
- if (info.autouuid) info.createPrimaryValue(value);
- for (Attribute attr : attrs) {
- Serializable val = attr.get(value);
- if (val instanceof byte[]) {
- Blob blob = conn.createBlob();
- blob.setBytes(1, (byte[]) val);
- prestmt.setObject(++i, blob);
- } else if (val instanceof AtomicInteger) {
- prestmt.setObject(++i, ((AtomicInteger) val).get());
- } else if (val instanceof AtomicLong) {
- prestmt.setObject(++i, ((AtomicLong) val).get());
- } else {
- prestmt.setObject(++i, val);
- }
- }
- prestmt.addBatch();
- }
- return prestmt;
- }
-
- @Override
- public int insertCache(Class clazz, T... values) {
- if (values.length == 0) return 0;
- final EntityInfo info = loadEntityInfo(clazz);
- final EntityCache cache = info.getCache();
- if (cache == null) return -1;
- int c = 0;
- for (T value : values) {
- c += cache.insert(value);
- }
- return c;
- }
-
- //-------------------------deleteCache--------------------------
- /**
- * 删除对象, 必须是Entity对象
- *
- * @param Entity类泛型
- * @param values Entity对象
- *
- * @return 删除的数据条数
- */
- @Override
- public int delete(T... values) {
- if (values.length == 0) return -1;
- if (values.length > 1) { //检查对象是否都是同一个Entity类
- Class clazz = null;
- for (T val : values) {
- if (clazz == null) {
- clazz = val.getClass();
- continue;
- }
- if (clazz != val.getClass()) {
- throw new RuntimeException("DataSource.delete must the same Class Entity, but diff is " + clazz + " and " + val.getClass());
- }
- }
- }
- final EntityInfo info = loadEntityInfo((Class) values[0].getClass());
- if (info.isVirtualEntity()) { //虚拟表只更新缓存Cache
- return delete(null, info, values);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return delete(conn, info, values);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture deleteAsync(final T... values) {
- return CompletableFuture.supplyAsync(() -> delete(values), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int delete(final Connection conn, final EntityInfo info, T... values) {
- if (values.length == 0) return -1;
- final Attribute primary = info.getPrimary();
- Serializable[] ids = new Serializable[values.length];
- int i = 0;
- for (final T value : values) {
- ids[i++] = (Serializable) primary.get(value);
- }
- return delete(conn, info, ids);
- }
-
- @Override
- public int delete(Class clazz, Serializable... ids) {
- final EntityInfo info = loadEntityInfo(clazz);
- if (info.isVirtualEntity()) { //虚拟表只更新缓存Cache
- return delete(null, info, ids);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return delete(conn, info, ids);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture deleteAsync(final Class clazz, final Serializable... ids) {
- return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int delete(final Connection conn, final EntityInfo info, Serializable... keys) {
- if (keys.length == 0) return -1;
- int c = -1;
- int c2 = 0;
- try {
- if (!info.isVirtualEntity()) {
- conn.setReadOnly(false);
- final Statement stmt = conn.createStatement();
- for (Serializable key : keys) {
- String sql = "DELETE FROM " + info.getTable(key) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(key);
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
- stmt.addBatch(sql);
- }
- int[] pc = stmt.executeBatch();
- c = 0;
- for (int p : pc) {
- if (p >= 0) c += p;
- }
- stmt.close();
- }
- //------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- for (Serializable key : keys) {
- c2 += cache.delete(key);
- }
- if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys);
- return c >= 0 ? c : c2;
- } catch (SQLException e) {
- if (info.tableStrategy != null && info.isTableNotExist(e)) return c >= 0 ? c : c2;
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public int delete(Class clazz, FilterNode node) {
- final EntityInfo info = loadEntityInfo(clazz);
- if (info.isVirtualEntity()) {
- return delete(null, info, null, node);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return delete(conn, info, null, node);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture deleteAsync(final Class clazz, final FilterNode node) {
- return CompletableFuture.supplyAsync(() -> delete(clazz, node), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- @Override
- public int delete(Class clazz, final Flipper flipper, FilterNode node) {
- final EntityInfo info = loadEntityInfo(clazz);
- if (info.isVirtualEntity()) {
- return delete(null, info, flipper, node);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return delete(conn, info, flipper, node);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture deleteAsync(final Class clazz, final Flipper flipper, FilterNode node) {
- return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int delete(final Connection conn, final EntityInfo info, final Flipper flipper, final FilterNode node) {
- int c = -1;
- try {
- if (!info.isVirtualEntity()) {
- Map joinTabalis = node.getJoinTabalis();
- CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
- CharSequence where = node.createSQLExpress(info, joinTabalis);
-
- StringBuilder join1 = null;
- StringBuilder join2 = null;
- if (join != null) {
- String joinstr = join.toString();
- join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
- join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
- }
- String sql = "DELETE " + ("mysql".equals(this.readPool.getDbtype()) ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1))
- + ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
- : (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper)
- + ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
- conn.setReadOnly(false);
- final Statement stmt = conn.createStatement();
- c = stmt.executeUpdate(sql);
- stmt.close();
- }
- //------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- Serializable[] ids = cache.delete(flipper, node);
- if (cacheListener != null) cacheListener.deleteCache(info.getType(), ids);
- return c >= 0 ? c : (ids == null ? 0 : ids.length);
- } catch (SQLException e) {
- if (info.tableStrategy != null && info.isTableNotExist(e)) return c;
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public int deleteCache(Class clazz, Serializable... ids) {
- if (ids.length == 0) return 0;
- final EntityInfo info = loadEntityInfo(clazz);
- final EntityCache cache = info.getCache();
- if (cache == null) return -1;
- int c = 0;
- for (Serializable id : ids) {
- c += cache.delete(id);
- }
- return c;
- }
-
- //------------------------updateAsync---------------------------
- /**
- * 更新对象, 必须是Entity对象
- *
- * @param Entity类泛型
- * @param values Entity对象
- *
- * @return 更新的数据条数
- */
- @Override
- public int update(T... values) {
- if (values.length == 0) return 0;
- if (values.length > 1) { //检查对象是否都是同一个Entity类
- Class clazz = null;
- for (T val : values) {
- if (clazz == null) {
- clazz = val.getClass();
- continue;
- }
- if (clazz != val.getClass()) {
- throw new RuntimeException("DataSource.update must the same Class Entity, but diff is " + clazz + " and " + val.getClass());
- }
- }
- }
- final EntityInfo info = loadEntityInfo((Class) values[0].getClass());
- if (info.isVirtualEntity()) {
- return update(null, info, values);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return update(conn, info, values);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture updateAsync(final T... values) {
- return CompletableFuture.supplyAsync(() -> update(values), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int update(final Connection conn, final EntityInfo info, T... values) {
- try {
- Class clazz = info.getType();
- int c = -1;
- if (!info.isVirtualEntity()) {
- final String updateSQL = info.getUpdatePrepareSQL(values[0]);
- final Attribute primary = info.getPrimary();
- conn.setReadOnly(false);
- final PreparedStatement prestmt = conn.prepareStatement(updateSQL);
- Attribute[] attrs = info.updateAttributes;
- final boolean debugfinest = info.isLoggable(logger, Level.FINEST);
- char[] sqlchars = debugfinest ? updateSQL.toCharArray() : null;
- for (final T value : values) {
- int k = 0;
- for (Attribute attr : attrs) {
- Serializable val = attr.get(value);
- if (val instanceof byte[]) {
- Blob blob = conn.createBlob();
- blob.setBytes(1, (byte[]) val);
- prestmt.setObject(++k, blob);
- } else if (val instanceof AtomicInteger) {
- prestmt.setObject(++k, ((AtomicInteger) val).get());
- } else if (val instanceof AtomicLong) {
- prestmt.setObject(++k, ((AtomicLong) val).get());
- } else {
- prestmt.setObject(++k, val);
- }
- }
- prestmt.setObject(++k, primary.get(value));
- prestmt.addBatch();//------------------------------------------------------------
- if (debugfinest) { //打印调试信息
- //-----------------------------
- int i = 0;
- StringBuilder sb = new StringBuilder(128);
- for (char ch : sqlchars) {
- if (ch == '?') {
- Object obj = i == attrs.length ? primary.get(value) : attrs[i++].get(value);
- if (obj != null && obj.getClass().isArray()) {
- sb.append("'[length=").append(java.lang.reflect.Array.getLength(obj)).append("]'");
- } else {
- sb.append(FilterNode.formatToString(obj));
- }
- } else {
- sb.append(ch);
- }
- }
- logger.finest(info.getType().getSimpleName() + " update sql=" + sb.toString().replaceAll("(\r|\n)", "\\n"));
- } //打印结束
- }
- int[] pc = prestmt.executeBatch();
- c = 0;
- for (int p : pc) {
- if (p >= 0) c += p;
- }
- prestmt.close();
- }
- //---------------------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- int c2 = 0;
- for (final T value : values) {
- c2 += cache.update(value);
- }
- if (cacheListener != null) cacheListener.updateCache(clazz, values);
- return c >= 0 ? c : c2;
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * 根据主键值更新对象的column对应的值, 必须是Entity Class
- *
- * @param Entity类的泛型
- * @param clazz Entity类
- * @param id 主键值
- * @param column 过滤字段名
- * @param value 过滤字段值
- *
- * @return 更新的数据条数
- */
- @Override
- public int updateColumn(Class clazz, Serializable id, String column, Serializable value) {
- final EntityInfo info = loadEntityInfo(clazz);
- if (info.isVirtualEntity()) {
- return updateColumn(null, info, id, column, value);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return updateColumn(conn, info, id, column, value);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final Class clazz, final Serializable id, final String column, final Serializable value) {
- return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int updateColumn(Connection conn, final EntityInfo info, Serializable id, String column, final Serializable value) {
- try {
- int c = -1;
- if (!info.isVirtualEntity()) {
- if (value instanceof byte[]) {
- String sql = "UPDATE " + info.getTable(id) + " SET " + info.getSQLColumn(null, column) + " = ? WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
- conn.setReadOnly(false);
- final PreparedStatement stmt = conn.prepareStatement(sql);
- Blob blob = conn.createBlob();
- blob.setBytes(1, (byte[]) value);
- stmt.setBlob(1, blob);
- c = stmt.executeUpdate(sql);
- stmt.close();
- } else {
- String sql = "UPDATE " + info.getTable(id) + " SET " + info.getSQLColumn(null, column) + " = "
- + info.formatToString(value) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
- conn.setReadOnly(false);
- final Statement stmt = conn.createStatement();
- c = stmt.executeUpdate(sql);
- stmt.close();
- }
- }
- //---------------------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- T rs = cache.update(id, info.getAttribute(column), value);
- if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
- return c >= 0 ? c : (rs == null ? 0 : 1);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } finally {
- if (conn != null) closeSQLConnection(conn);
- }
- }
-
- /**
- * 根据主键值更新对象的column对应的值, 必须是Entity Class
- *
- * @param Entity类的泛型
- * @param clazz Entity类
- * @param column 过滤字段名
- * @param value 过滤字段值
- * @param node 过滤node 不能为null
- *
- * @return 更新的数据条数
- */
- @Override
- public int updateColumn(Class clazz, String column, Serializable value, FilterNode node) {
- final EntityInfo info = loadEntityInfo(clazz);
- if (info.isVirtualEntity()) {
- return updateColumn(null, info, column, value, node);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return updateColumn(conn, info, column, value, node);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final Class clazz, final String column, final Serializable value, final FilterNode node) {
- return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int updateColumn(Connection conn, final EntityInfo info, String column, final Serializable value, FilterNode node) {
- try {
- int c = -1;
- if (!info.isVirtualEntity()) {
- Map joinTabalis = node.getJoinTabalis();
- CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
- CharSequence where = node.createSQLExpress(info, joinTabalis);
-
- StringBuilder join1 = null;
- StringBuilder join2 = null;
- if (join != null) {
- String joinstr = join.toString();
- join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
- join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
- }
- if (value instanceof byte[]) {
- String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1))
- + " SET " + info.getSQLColumn("a", column) + " = ?"
- + ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
- : (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
- conn.setReadOnly(false);
- Blob blob = conn.createBlob();
- blob.setBytes(1, (byte[]) value);
- final PreparedStatement stmt = conn.prepareStatement(sql);
- stmt.setBlob(1, blob);
- c = stmt.executeUpdate(sql);
- stmt.close();
- } else {
- String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1))
- + " SET " + info.getSQLColumn("a", column) + " = " + info.formatToString(value)
- + ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
- : (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
- conn.setReadOnly(false);
- final Statement stmt = conn.createStatement();
- c = stmt.executeUpdate(sql);
- stmt.close();
- }
- }
- //---------------------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- T[] rs = cache.update(info.getAttribute(column), value, node);
- if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
- return c >= 0 ? c : (rs == null ? 0 : rs.length);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } finally {
- if (conn != null) closeSQLConnection(conn);
- }
- }
-
- /**
- * 根据主键值更新对象的多个column对应的值, 必须是Entity Class
- *
- * @param Entity类的泛型
- * @param clazz Entity类
- * @param id 主键值
- * @param values 字段值
- *
- * @return 更新的数据条数
- */
- @Override
- public int updateColumn(final Class clazz, final Serializable id, final ColumnValue... values) {
- final EntityInfo info = loadEntityInfo(clazz);
- if (info.isVirtualEntity()) {
- return updateColumn(null, info, id, values);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return updateColumn(conn, info, id, values);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final Class clazz, final Serializable id, final ColumnValue... values) {
- return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int updateColumn(final Connection conn, final EntityInfo info, final Serializable id, final ColumnValue... values) {
- if (values == null || values.length < 1) return -1;
- try {
- StringBuilder setsql = new StringBuilder();
- final List> attrs = new ArrayList<>();
- final List cols = new ArrayList<>();
- final boolean virtual = info.isVirtualEntity();
- List blobs = null;
- for (ColumnValue col : values) {
- Attribute attr = info.getUpdateAttribute(col.getColumn());
- if (attr == null) throw new RuntimeException(info.getType() + " cannot found column " + col.getColumn());
- attrs.add(attr);
- cols.add(col);
- if (!virtual) {
- if (setsql.length() > 0) setsql.append(", ");
- String c = info.getSQLColumn(null, col.getColumn());
- if (col.getValue() instanceof byte[]) {
- if (blobs == null) blobs = new ArrayList<>();
- blobs.add((byte[]) col.getValue());
- setsql.append(c).append(" = ?");
- } else {
- setsql.append(c).append(" = ").append(info.formatSQLValue(c, col));
- }
- }
- }
- int c = -1;
- if (!virtual) {
- String sql = "UPDATE " + info.getTable(id) + " SET " + setsql + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + ": " + sql);
- conn.setReadOnly(false);
- if (blobs != null) {
- final PreparedStatement stmt = conn.prepareStatement(sql);
- int idx = 0;
- for (byte[] bs : blobs) {
- Blob blob = conn.createBlob();
- blob.setBytes(1, bs);
- stmt.setBlob(++idx, blob);
- }
- c = stmt.executeUpdate();
- stmt.close();
- } else {
- final Statement stmt = conn.createStatement();
- c = stmt.executeUpdate(sql);
- stmt.close();
- }
- }
- //---------------------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- T rs = cache.updateColumn(id, attrs, cols);
- if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
- return c >= 0 ? c : (rs == null ? 0 : 1);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * 根据主键值更新对象的多个column对应的值, 必须是Entity Class
- *
- * @param Entity类的泛型
- * @param clazz Entity类
- * @param node 过滤条件
- * @param values 字段值
- *
- * @return 更新的数据条数
- */
- @Override
- public int updateColumn(final Class clazz, final FilterNode node, final ColumnValue... values) {
- final EntityInfo info = loadEntityInfo(clazz);
- if (info.isVirtualEntity()) {
- return updateColumn(null, info, node, null, values);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return updateColumn(conn, info, node, null, values);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final Class clazz, final FilterNode node, final ColumnValue... values) {
- return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, values), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- /**
- * 根据主键值更新对象的多个column对应的值, 必须是Entity Class
- *
- * @param Entity类的泛型
- * @param clazz Entity类
- * @param node 过滤条件
- * @param flipper 翻页对象
- * @param values 字段值
- *
- * @return 更新的数据条数
- */
- @Override
- public int updateColumn(final Class clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
- final EntityInfo info = loadEntityInfo(clazz);
- if (info.isVirtualEntity()) {
- return updateColumn(null, info, node, flipper, values);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return updateColumn(conn, info, node, flipper, values);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final Class clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
- return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int updateColumn(final Connection conn, final EntityInfo info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
- if (values == null || values.length < 1) return -1;
- try {
- StringBuilder setsql = new StringBuilder();
- final List> attrs = new ArrayList<>();
- final List cols = new ArrayList<>();
- final boolean virtual = info.isVirtualEntity();
- List blobs = null;
- for (ColumnValue col : values) {
- Attribute attr = info.getUpdateAttribute(col.getColumn());
- if (attr == null) continue;
- attrs.add(attr);
- cols.add(col);
- if (!virtual) {
- if (setsql.length() > 0) setsql.append(", ");
- String c = info.getSQLColumn("a", col.getColumn());
- if (col.getValue() instanceof byte[]) {
- if (blobs == null) blobs = new ArrayList<>();
- blobs.add((byte[]) col.getValue());
- setsql.append(c).append(" = ?");
- } else {
- setsql.append(c).append(" = ").append(info.formatSQLValue(c, col));
- }
- }
- }
- int c = -1;
- if (!virtual) {
- Map joinTabalis = node.getJoinTabalis();
- CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
- CharSequence where = node.createSQLExpress(info, joinTabalis);
- StringBuilder join1 = null;
- StringBuilder join2 = null;
- if (join != null) {
- String joinstr = join.toString();
- join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
- join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
- }
- String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
- + ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
- : (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))))
- + info.createSQLOrderby(flipper);
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
- conn.setReadOnly(false);
- if (blobs != null) {
- final PreparedStatement stmt = conn.prepareStatement(sql);
- int idx = 0;
- for (byte[] bs : blobs) {
- Blob blob = conn.createBlob();
- blob.setBytes(1, bs);
- stmt.setBlob(++idx, blob);
- }
- c = stmt.executeUpdate();
- stmt.close();
- } else {
- final Statement stmt = conn.createStatement();
- c = stmt.executeUpdate(sql);
- stmt.close();
- }
- }
- //---------------------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- T[] rs = cache.updateColumn(node, flipper, attrs, cols);
- if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
- return c >= 0 ? c : (rs == null ? 0 : rs.length);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public int updateColumn(final T bean, final String... columns) {
- return updateColumn(bean, SelectColumn.createIncludes(columns));
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final T bean, final String... columns) {
- return CompletableFuture.supplyAsync(() -> updateColumn(bean, columns), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- @Override
- public int updateColumn(final T bean, final FilterNode node, final String... columns) {
- return updateColumn(bean, node, SelectColumn.createIncludes(columns));
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final T bean, final FilterNode node, final String... columns) {
- return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, columns), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- @Override
- public int updateColumn(final T bean, final SelectColumn selects) {
- final EntityInfo info = loadEntityInfo((Class) bean.getClass());
- if (info.isVirtualEntity()) {
- return updateColumns(null, info, bean, selects);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return updateColumns(conn, info, bean, selects);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final T bean, final SelectColumn selects) {
- return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int updateColumns(final Connection conn, final EntityInfo info, final T bean, final SelectColumn selects) {
- if (bean == null || selects == null) return -1;
- try {
- final Class clazz = (Class) bean.getClass();
- StringBuilder setsql = new StringBuilder();
- final Serializable id = info.getPrimary().get(bean);
- final List> attrs = new ArrayList<>();
- List blobs = null;
- final boolean virtual = info.isVirtualEntity();
- for (Attribute attr : info.updateAttributes) {
- if (!selects.test(attr.field())) continue;
- attrs.add(attr);
- if (!virtual) {
- if (setsql.length() > 0) setsql.append(", ");
- setsql.append(info.getSQLColumn(null, attr.field()));
- Serializable val = attr.get(bean);
- if (val instanceof byte[]) {
- if (blobs == null) blobs = new ArrayList<>();
- blobs.add((byte[]) val);
- setsql.append(" = ?");
- } else {
- setsql.append(" = ").append(info.formatToString(val));
- }
- }
- }
- int c = -1;
- if (!virtual) {
- String sql = "UPDATE " + info.getTable(id) + " SET " + setsql + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(bean.getClass().getSimpleName() + ": " + sql);
- conn.setReadOnly(false);
- if (blobs != null) {
- final PreparedStatement stmt = conn.prepareStatement(sql);
- int idx = 0;
- for (byte[] bs : blobs) {
- Blob blob = conn.createBlob();
- blob.setBytes(1, bs);
- stmt.setBlob(++idx, blob);
- }
- c = stmt.executeUpdate();
- stmt.close();
- } else {
- final Statement stmt = conn.createStatement();
- c = stmt.executeUpdate(sql);
- stmt.close();
- }
- }
- //---------------------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- T rs = cache.update(bean, attrs);
- if (cacheListener != null) cacheListener.updateCache(clazz, rs);
- return c >= 0 ? c : (rs == null ? 0 : 1);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public int updateColumn(final T bean, final FilterNode node, final SelectColumn selects) {
- final EntityInfo info = loadEntityInfo((Class) bean.getClass());
- if (info.isVirtualEntity()) {
- return updateColumns(null, info, bean, node, selects);
- }
- Connection conn = createWriteSQLConnection();
- try {
- return updateColumns(conn, info, bean, node, selects);
- } finally {
- closeSQLConnection(conn);
- }
- }
-
- @Override
- public CompletableFuture updateColumnAsync(final T bean, final FilterNode node, final SelectColumn selects) {
- return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor()).whenComplete(futureCompleteConsumer);
- }
-
- protected int updateColumns(final Connection conn, final EntityInfo info, final T bean, final FilterNode node, final SelectColumn selects) {
- if (bean == null || node == null || selects == null) return -1;
- try {
- final Class clazz = (Class) bean.getClass();
- StringBuilder setsql = new StringBuilder();
- final Serializable id = info.getPrimary().get(bean);
- final List> attrs = new ArrayList<>();
- List blobs = null;
- final boolean virtual = info.isVirtualEntity();
- for (Attribute attr : info.updateAttributes) {
- if (!selects.test(attr.field())) continue;
- attrs.add(attr);
- if (!virtual) {
- if (setsql.length() > 0) setsql.append(", ");
- setsql.append(info.getSQLColumn("a", attr.field()));
- Serializable val = attr.get(bean);
- if (val instanceof byte[]) {
- if (blobs == null) blobs = new ArrayList<>();
- blobs.add((byte[]) val);
- setsql.append(" = ?");
- } else {
- setsql.append(" = ").append(info.formatToString(val));
- }
- }
- }
- int c = -1;
- if (!virtual) {
- Map joinTabalis = node.getJoinTabalis();
- CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
- CharSequence where = node.createSQLExpress(info, joinTabalis);
- StringBuilder join1 = null;
- StringBuilder join2 = null;
- if (join != null) {
- String joinstr = join.toString();
- join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
- join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
- }
- String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
- + ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
- : (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
- if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
- conn.setReadOnly(false);
- if (blobs != null) {
- final PreparedStatement stmt = conn.prepareStatement(sql);
- int idx = 0;
- for (byte[] bs : blobs) {
- Blob blob = conn.createBlob();
- blob.setBytes(1, bs);
- stmt.setBlob(++idx, blob);
- }
- c = stmt.executeUpdate();
- stmt.close();
- } else {
- final Statement stmt = conn.createStatement();
- c = stmt.executeUpdate(sql);
- stmt.close();
- }
- }
- //---------------------------------------------------
- final EntityCache cache = info.getCache();
- if (cache == null) return c;
- T[] rs = cache.update(bean, attrs, node);
- if (cacheListener != null) cacheListener.updateCache(clazz, rs);
- return c >= 0 ? c : (rs == null ? 0 : rs.length);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public int updateCache(Class clazz, T... values) {
- if (values.length == 0) return 0;
- final EntityInfo info = loadEntityInfo(clazz);
- final EntityCache cache = info.getCache();
- if (cache == null) return -1;
- int c = 0;
- for (T value : values) {
- c += cache.update(value);
- }
- return c;
- }
-
- public int reloadCache(Class clazz, Serializable... ids) {
- final EntityInfo info = loadEntityInfo(clazz);
- final EntityCache cache = info.getCache();
- if (cache == null) return -1;
- String column = info.getPrimary().field();
- int c = 0;
- for (Serializable id : ids) {
- Sheet sheet = querySheet(false, true, clazz, null, FLIPPER_ONE, FilterNode.create(column, id));
- T value = sheet.isEmpty() ? null : sheet.list().get(0);
- if (value != null) c += cache.update(value);
- }
- return c;
- }
-
- //-----------------------getNumberResultAsync-----------------------------
- @Override
- public Number getNumberResult(final Class entityClass, final FilterFunc func, final String column) {
- return getNumberResult(entityClass, func, null, column, (FilterNode) null);
- }
-
- @Override
- public CompletableFuture getNumberResultAsync(final Class entityClass, final FilterFunc func, final String column) {
- return CompletableFuture.supplyAsync(() -> getNumberResult(entityClass, func, column), getExecutor());
- }
-
- @Override
- public Number getNumberResult(final Class entityClass, final FilterFunc func, final String column, FilterBean bean) {
- return getNumberResult(entityClass, func, null, column, FilterNodeBean.createFilterNode(bean));
- }
-
- @Override
- public CompletableFuture getNumberResultAsync(final Class entityClass, final FilterFunc func, final String column, final FilterBean bean) {
- return CompletableFuture.supplyAsync(() -> getNumberResult(entityClass, func, column, bean), getExecutor());
- }
-
- @Override
- public Number getNumberResult(final Class entityClass, final FilterFunc func, final String column, final FilterNode node) {
- return getNumberResult(entityClass, func, null, column, node);
- }
-
- @Override
- public CompletableFuture getNumberResultAsync(final Class entityClass, final FilterFunc func, final String column, final FilterNode node) {
- return CompletableFuture.supplyAsync(() -> getNumberResult(entityClass, func, column, node), getExecutor());
- }
-
- @Override
- public Number getNumberResult(final Class entityClass, final FilterFunc func, final Number defVal, final String column) {
- return getNumberResult(entityClass, func, defVal, column, (FilterNode) null);
- }
-
- @Override
- public CompletableFuture getNumberResultAsync(final Class entityClass, final FilterFunc func, final Number defVal, final String column) {
- return CompletableFuture.supplyAsync(() -> getNumberResult(entityClass, func, defVal, column), getExecutor());
- }
-
- @Override
- public Number getNumberResult(final Class entityClass, final FilterFunc func, final Number defVal, final String column, FilterBean bean) {
- return getNumberResult(entityClass, func, defVal, column, FilterNodeBean.createFilterNode(bean));
- }
-
- @Override
- public CompletableFuture getNumberResultAsync(final Class entityClass, final FilterFunc func, final Number defVal, final String column, final FilterBean bean) {
- return CompletableFuture.supplyAsync(() -> getNumberResult(entityClass, func, defVal, column, bean), getExecutor());
- }
-
- @Override
- public CompletableFuture getNumberResultAsync(final Class entityClass, final FilterFunc func, final Number defVal, final String column, final FilterNode node) {
- return CompletableFuture.supplyAsync(() -> getNumberResult(entityClass, func, defVal, column, node), getExecutor());
- }
-
- @Override
- public Map getNumberMap(final Class entityClass, final FilterFuncColumn... columns) {
- return getNumberMap(entityClass, (FilterNode) null, columns);
- }
-
- @Override
- public CompletableFuture