From 5d77139965f8b366c996b6bced2a7868f9c79044 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Fri, 4 May 2018 15:17:59 +0800 Subject: [PATCH] --- src/org/redkale/source/DataSqlSource.java | 336 ++++++++++++---------- 1 file changed, 179 insertions(+), 157 deletions(-) diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index a2237ddc9..e03024389 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -21,7 +21,8 @@ import static org.redkale.source.DataSources.*; import org.redkale.util.*; /** - * DataSource的SQL抽象实现类 + * DataSource的SQL抽象实现类
+ * 注意: 所有的操作只能作用在一张表上,不能同时变更多张表 * *

* 详情见: https://redkale.org @@ -103,7 +104,7 @@ public abstract class DataSqlSource extends AbstractService implement } //是否异步, 为true则只能调用pollAsync方法,为false则只能调用poll方法 - protected abstract boolean isAysnc(); + protected abstract boolean isAsync(); //index从1开始 protected abstract String getPrepareParamSign(int index); @@ -111,6 +112,12 @@ public abstract class DataSqlSource extends AbstractService implement //创建连接池 protected abstract PoolSource createPoolSource(DataSource source, String rwtype, Properties prop); + //插入新纪录 + protected abstract CompletableFuture insertDB(final EntityInfo info, T... values); + + //删除记录 + protected abstract CompletableFuture deleteDB(final EntityInfo info, Flipper flipper, final String sql); + //查询单条记录 protected abstract CompletableFuture findDB(final EntityInfo info, final String sql, final boolean onlypk, final SelectColumn selects); @@ -185,7 +192,7 @@ public abstract class DataSqlSource extends AbstractService implement } protected CompletableFuture completeVoidFuture() { - return isAysnc() ? CompletableFuture.completedFuture(null) : null; + return isAsync() ? CompletableFuture.completedFuture(null) : null; } /** @@ -200,6 +207,27 @@ public abstract class DataSqlSource extends AbstractService implement if (cache == null) return; cache.fullLoad(); } + ////检查对象是否都是同一个Entity类 + + protected CompletableFuture checkEntity(String action, boolean async, T... values) { + if (values.length < 1) return null; + Class clazz = null; + for (T val : values) { + if (clazz == null) { + clazz = val.getClass(); + continue; + } + if (clazz != val.getClass()) { + if (async) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new SQLException("DataSource." + action + " must the same Class Entity, but diff is " + clazz + " and " + val.getClass())); + return future; + } + throw new RuntimeException("DataSource." + action + " must the same Class Entity, but diff is " + clazz + " and " + val.getClass()); + } + } + return null; + } //----------------------------- insert ----------------------------- /** @@ -212,72 +240,52 @@ public abstract class DataSqlSource extends AbstractService implement public void insert(@RpcCall(DataCallArrayAttribute.class) T... values) { if (values.length == 0) return; final EntityInfo info = loadEntityInfo((Class) values[0].getClass()); + checkEntity("insert", false, values); if (info.isVirtualEntity()) { - if (isAysnc()) { - insert(null, info, values).join(); - } else { - insert(null, info, values); - } - } else { - if (isAysnc()) { - writePool.pollAsync().thenApply(conn -> insert(conn, info, values)).join(); - } else { - insert(writePool.poll(), info, values); - } + insertCache(info, values); + return; } + insertDB(info, values).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); + } else { + insertCache(info, values); + } + }).join(); } @Override public CompletableFuture insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) { if (values.length == 0) return completeVoidFuture(); + CompletableFuture future = checkEntity("insert", true, values); + if (future != null) return future; final EntityInfo info = loadEntityInfo((Class) values[0].getClass()); if (info.isVirtualEntity()) { - if (isAysnc()) { - return insert(null, info, values).whenComplete(futureCompleteConsumer); - } else { - return CompletableFuture.runAsync(() -> insert(null, info, values), getExecutor()).whenComplete(futureCompleteConsumer); - } - } else { - if (isAysnc()) { - return writePool.pollAsync().thenApply(conn -> insert(conn, info, values)).whenComplete(futureCompleteConsumer); - } else { - return CompletableFuture.runAsync(() -> insert(writePool.poll(), info, values), getExecutor()).whenComplete(futureCompleteConsumer); - } + return CompletableFuture.runAsync(() -> insertCache(info, values), getExecutor()); } + if (isAsync()) return insertDB(info, values).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); + } else { + insertCache(info, values); + } + }); + return CompletableFuture.runAsync(() -> insertDB(info, values).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); + } else { + insertCache(info, values); + } + }), getExecutor()); } - protected CompletableFuture insert(final DBChannel conn, final EntityInfo info, T... values) { - if (values.length == 0) return completeVoidFuture(); - if (values.length > 1) { //检查对象是否都是同一个Entity类 - Class clazz = null; - for (T val : values) { - if (clazz == null) { - clazz = val.getClass(); - continue; - } - if (clazz != val.getClass()) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(new SQLException("DataSource.insert must the same Class Entity, but diff is " + clazz + " and " + val.getClass())); - return future; - } - } + protected void insertCache(final EntityInfo info, T... values) { + final EntityCache cache = info.getCache(); + if (cache == null) return; + for (final T value : values) { + cache.insert(value); } - if (info.isVirtualEntity()) { - final EntityCache cache = info.getCache(); - if (cache != null) { //更新缓存 - for (final T value : values) { - cache.insert(value); - } - if (cacheListener != null) cacheListener.insertCache(info.getType(), values); - } - return completeVoidFuture(); - } - if (isAysnc()) { //异步模式 - - } else { - - } - return completeVoidFuture(); + if (cacheListener != null) cacheListener.insertCache(info.getType(), values); } @Override @@ -305,18 +313,7 @@ public abstract class DataSqlSource extends AbstractService implement @Override public int delete(T... values) { if (values.length == 0) return -1; - if (values.length > 1) { //检查对象是否都是同一个Entity类 - Class clazz = null; - for (T val : values) { - if (clazz == null) { - clazz = val.getClass(); - continue; - } - if (clazz != val.getClass()) { - throw new RuntimeException("DataSource.delete must the same Class Entity, but diff is " + clazz + " and " + val.getClass()); - } - } - } + checkEntity("delete", false, values); final Class clazz = (Class) values[0].getClass(); final EntityInfo info = loadEntityInfo(clazz); final Attribute primary = info.getPrimary(); @@ -331,20 +328,8 @@ public abstract class DataSqlSource extends AbstractService implement @Override public CompletableFuture deleteAsync(final T... values) { if (values.length == 0) return CompletableFuture.completedFuture(-1); - if (values.length > 1) { //检查对象是否都是同一个Entity类 - Class clazz = null; - for (T val : values) { - if (clazz == null) { - clazz = val.getClass(); - continue; - } - if (clazz != val.getClass()) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(new SQLException("DataSource.delete must the same Class Entity, but diff is " + clazz + " and " + val.getClass())); - return future; - } - } - } + CompletableFuture future = checkEntity("delete", true, values); + if (future != null) return future; final Class clazz = (Class) values[0].getClass(); final EntityInfo info = loadEntityInfo(clazz); final Attribute primary = info.getPrimary(); @@ -360,54 +345,37 @@ public abstract class DataSqlSource extends AbstractService implement public int delete(Class clazz, Serializable... ids) { if (ids.length == 0) return -1; final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { - return delete(null, info, ids).join(); - } else { - if (isAysnc()) { - return writePool.pollAsync().thenCompose(conn -> delete(conn, info, ids)).join(); + if (info.isVirtualEntity()) return deleteCache(info, -1, ids); + return delete(info, ids).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); } else { - return delete(writePool.poll(), info, ids).join(); + deleteCache(info, rs, ids); } - } + }).join(); } @Override public CompletableFuture deleteAsync(final Class clazz, final Serializable... ids) { + if (ids.length == 0) return CompletableFuture.completedFuture(-1); final EntityInfo info = loadEntityInfo(clazz); if (info.isVirtualEntity()) { - if (isAysnc()) { - return delete(null, info, ids); + return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, ids), getExecutor()); + } + if (isAsync()) return delete(info, ids).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); + } else { + deleteCache(info, rs, ids); + } + }); + return CompletableFuture.supplyAsync(() -> delete(info, ids).join(), getExecutor()).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); } else { - return CompletableFuture.supplyAsync(() -> delete(null, info, ids).join(), getExecutor()); + deleteCache(info, rs, ids); } - } else { - if (isAysnc()) { - return writePool.pollAsync().thenCompose(conn -> delete(conn, info, ids)); - } else { - return CompletableFuture.supplyAsync(() -> delete(writePool.poll(), info, ids).join(), getExecutor()); - } - } - } - - protected CompletableFuture delete(final DBChannel conn, final EntityInfo info, Serializable... keys) { - if (keys.length == 0) return CompletableFuture.completedFuture(-1); - if (info.isVirtualEntity()) { - final EntityCache cache = info.getCache(); - if (cache == null) return CompletableFuture.completedFuture(-1); - int c = 0; - for (Serializable key : keys) { - c += cache.delete(key); - } - if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys); - return CompletableFuture.completedFuture(c); - } - //待实现 - if (isAysnc()) { //异步模式 - - } else { - - } - return CompletableFuture.completedFuture(-1); + }); } @Override @@ -423,46 +391,89 @@ public abstract class DataSqlSource extends AbstractService implement @Override public int delete(Class clazz, final Flipper flipper, FilterNode node) { final EntityInfo info = loadEntityInfo(clazz); - if (info.isVirtualEntity()) { - return delete(null, info, flipper, node).join(); - } else { - if (isAysnc()) { - return writePool.pollAsync().thenCompose(conn -> delete(conn, info, flipper, node)).join(); + if (info.isVirtualEntity()) return deleteCache(info, -1, flipper, node); + return delete(info, flipper, node).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); } else { - return delete(writePool.poll(), info, flipper, node).join(); + deleteCache(info, rs, flipper, node); } - } + }).join(); } @Override public CompletableFuture deleteAsync(final Class clazz, final Flipper flipper, FilterNode node) { final EntityInfo info = loadEntityInfo(clazz); if (info.isVirtualEntity()) { - if (isAysnc()) { - return delete(null, info, flipper, node); - } else { - return CompletableFuture.supplyAsync(() -> delete(null, info, flipper, node).join(), getExecutor()); - } - } else { - if (isAysnc()) { - return writePool.pollAsync().thenCompose(conn -> delete(conn, info, flipper, node)); - } else { - return CompletableFuture.supplyAsync(() -> delete(writePool.poll(), info, flipper, node).join(), getExecutor()); - } + return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor()); } + if (isAsync()) return delete(info, flipper, node).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); + } else { + deleteCache(info, rs, flipper, node); + } + }); + return CompletableFuture.supplyAsync(() -> delete(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> { + if (t != null) { + futureCompleteConsumer.accept(rs, t); + } else { + deleteCache(info, rs, flipper, node); + } + }); } - protected CompletableFuture delete(final DBChannel conn, final EntityInfo info, final Flipper flipper, FilterNode node) { - if (info.isVirtualEntity()) { - return CompletableFuture.completedFuture(-1); + protected CompletableFuture delete(final EntityInfo info, final Serializable... ids) { + if (ids.length == 1) { + String sql = "DELETE FROM " + info.getTable(ids[0]) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(ids[0]); + return deleteDB(info, null, sql); } - //待实现 - if (isAysnc()) { //异步模式 - - } else { - + String sql = "DELETE FROM " + info.getTable(ids[0]) + " WHERE " + info.getPrimarySQLColumn() + " IN ("; + for (int i = 0; i < ids.length; i++) { + if (i > 0) sql += ','; + sql += FilterNode.formatToString(ids[i]); } - return CompletableFuture.completedFuture(-1); + sql += ")"; + return deleteDB(info, null, sql); + } + + protected CompletableFuture delete(final EntityInfo info, final Flipper flipper, final FilterNode node) { + Map joinTabalis = node.getJoinTabalis(); + CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info); + CharSequence where = node.createSQLExpress(info, joinTabalis); + + StringBuilder join1 = null; + StringBuilder join2 = null; + if (join != null) { + String joinstr = join.toString(); + join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0); + join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0); + } + String sql = "DELETE " + ("mysql".equals(this.readPool.getDbtype()) ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1)) + + ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2)) + : (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper); + if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " delete sql=" + + (sql + ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit())))); + return deleteDB(info, flipper, sql); + } + + protected int deleteCache(final EntityInfo info, int count, Flipper flipper, FilterNode node) { + final EntityCache cache = info.getCache(); + if (cache == null) return -1; + Serializable[] ids = cache.delete(flipper, node); + if (cacheListener != null) cacheListener.deleteCache(info.getType(), ids); + return count >= 0 ? count : (ids == null ? 0 : ids.length); + } + + protected int deleteCache(final EntityInfo info, int count, Serializable... keys) { + final EntityCache cache = info.getCache(); + if (cache == null) return -1; + int c = 0; + for (Serializable key : keys) { + c += cache.delete(key); + } + if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys); + return count >= 0 ? count : c; } @Override @@ -478,6 +489,17 @@ public abstract class DataSqlSource extends AbstractService implement return c; } + protected static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) { + if (str == null) return sb; + int pos1 = str.indexOf(ch1, from); + if (pos1 < 0) return sb; + int pos2 = str.indexOf(ch2, from); + if (pos2 < 0) return sb; + if (sb.length() > 0) sb.append(split); + sb.append(str.substring(pos1 + 1, pos2)); + return multisplit(ch1, ch2, split, sb, str, pos2 + 1); + } + //---------------------------- update ---------------------------- //------------------------- getNumberMap ------------------------- @Override @@ -533,7 +555,7 @@ public abstract class DataSqlSource extends AbstractService implement return CompletableFuture.completedFuture(map); } } - if (isAysnc()) return getNumberMap(info, node, columns); + if (isAsync()) return getNumberMap(info, node, columns); return CompletableFuture.supplyAsync(() -> (Map) getNumberMap(info, node, columns).join(), getExecutor()); } @@ -627,7 +649,7 @@ public abstract class DataSqlSource extends AbstractService implement return CompletableFuture.completedFuture(cache.getNumberResult(func, defVal, column, node)); } } - if (isAysnc()) return getNumberResult(info, entityClass, func, defVal, column, node); + if (isAsync()) return getNumberResult(info, entityClass, func, defVal, column, node); return CompletableFuture.supplyAsync(() -> getNumberResult(info, entityClass, func, defVal, column, node).join(), getExecutor()); } @@ -684,7 +706,7 @@ public abstract class DataSqlSource extends AbstractService implement return CompletableFuture.completedFuture(cache.queryColumnMap(keyColumn, func, funcColumn, node)); } } - if (isAysnc()) return queryColumnMap(info, keyColumn, func, funcColumn, node); + if (isAsync()) return queryColumnMap(info, keyColumn, func, funcColumn, node); return CompletableFuture.supplyAsync(() -> (Map) queryColumnMap(info, keyColumn, func, funcColumn, node).join(), getExecutor()); } @@ -739,7 +761,7 @@ public abstract class DataSqlSource extends AbstractService implement T rs = cache.find(selects, pk); if (cache.isFullLoaded() || rs != null) return CompletableFuture.completedFuture(rs); } - if (isAysnc()) return find(info, selects, pk); + if (isAsync()) return find(info, selects, pk); return CompletableFuture.supplyAsync(() -> find(info, selects, pk).join(), getExecutor()); } @@ -804,7 +826,7 @@ public abstract class DataSqlSource extends AbstractService implement if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) { return CompletableFuture.completedFuture(cache.find(selects, node)); } - if (isAysnc()) return find(info, selects, node); + if (isAsync()) return find(info, selects, node); return CompletableFuture.supplyAsync(() -> find(info, selects, node).join(), getExecutor()); } @@ -876,7 +898,7 @@ public abstract class DataSqlSource extends AbstractService implement Serializable val = cache.findColumn(column, defValue, pk); if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val); } - if (isAysnc()) return findColumn(info, column, defValue, pk); + if (isAsync()) return findColumn(info, column, defValue, pk); return CompletableFuture.supplyAsync(() -> findColumn(info, column, defValue, pk).join(), getExecutor()); } @@ -905,7 +927,7 @@ public abstract class DataSqlSource extends AbstractService implement Serializable val = cache.findColumn(column, defValue, node); if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val); } - if (isAysnc()) return findColumn(info, column, defValue, node); + if (isAsync()) return findColumn(info, column, defValue, node); return CompletableFuture.supplyAsync(() -> findColumn(info, column, defValue, node).join(), getExecutor()); } @@ -938,7 +960,7 @@ public abstract class DataSqlSource extends AbstractService implement boolean rs = cache.exists(pk); if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs); } - if (isAysnc()) return exists(info, pk); + if (isAsync()) return exists(info, pk); return CompletableFuture.supplyAsync(() -> exists(info, pk).join(), getExecutor()); } @@ -977,7 +999,7 @@ public abstract class DataSqlSource extends AbstractService implement boolean rs = cache.exists(node); if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs); } - if (isAysnc()) return exists(info, node); + if (isAsync()) return exists(info, node); return CompletableFuture.supplyAsync(() -> exists(info, node).join(), getExecutor()); } @@ -1510,7 +1532,7 @@ public abstract class DataSqlSource extends AbstractService implement @Override public CompletableFuture> querySheetAsync(final Class clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) { - if (isAysnc()) return querySheet(true, true, clazz, selects, flipper, node); + if (isAsync()) return querySheet(true, true, clazz, selects, flipper, node); return CompletableFuture.supplyAsync(() -> querySheet(true, true, clazz, selects, flipper, node).join(), getExecutor()); }