This commit is contained in:
Redkale
2018-05-04 15:17:59 +08:00
parent 126dab08b2
commit 5d77139965

View File

@@ -21,7 +21,8 @@ import static org.redkale.source.DataSources.*;
import org.redkale.util.*;
/**
* DataSource的SQL抽象实现类
* DataSource的SQL抽象实现类 <br>
* 注意: 所有的操作只能作用在一张表上,不能同时变更多张表
*
* <p>
* 详情见: https://redkale.org
@@ -103,7 +104,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}
//是否异步, 为true则只能调用pollAsync方法为false则只能调用poll方法
protected abstract boolean isAysnc();
protected abstract boolean isAsync();
//index从1开始
protected abstract String getPrepareParamSign(int index);
@@ -111,6 +112,12 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
//创建连接池
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, Properties prop);
//插入新纪录
protected abstract <T> CompletableFuture<Void> insertDB(final EntityInfo<T> info, T... values);
//删除记录
protected abstract <T> CompletableFuture<Integer> deleteDB(final EntityInfo<T> info, Flipper flipper, final String sql);
//查询单条记录
protected abstract <T> CompletableFuture<T> findDB(final EntityInfo<T> info, final String sql, final boolean onlypk, final SelectColumn selects);
@@ -185,7 +192,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}
protected CompletableFuture<Void> completeVoidFuture() {
return isAysnc() ? CompletableFuture.completedFuture(null) : null;
return isAsync() ? CompletableFuture.completedFuture(null) : null;
}
/**
@@ -200,6 +207,27 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (cache == null) return;
cache.fullLoad();
}
////检查对象是否都是同一个Entity类
protected <T> CompletableFuture checkEntity(String action, boolean async, T... values) {
if (values.length < 1) return null;
Class clazz = null;
for (T val : values) {
if (clazz == null) {
clazz = val.getClass();
continue;
}
if (clazz != val.getClass()) {
if (async) {
CompletableFuture future = new CompletableFuture<>();
future.completeExceptionally(new SQLException("DataSource." + action + " must the same Class Entity, but diff is " + clazz + " and " + val.getClass()));
return future;
}
throw new RuntimeException("DataSource." + action + " must the same Class Entity, but diff is " + clazz + " and " + val.getClass());
}
}
return null;
}
//----------------------------- insert -----------------------------
/**
@@ -212,72 +240,52 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> void insert(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
checkEntity("insert", false, values);
if (info.isVirtualEntity()) {
if (isAysnc()) {
insert(null, info, values).join();
} else {
insert(null, info, values);
}
} else {
if (isAysnc()) {
writePool.pollAsync().thenApply(conn -> insert(conn, info, values)).join();
} else {
insert(writePool.poll(), info, values);
}
insertCache(info, values);
return;
}
insertDB(info, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
insertCache(info, values);
}
}).join();
}
@Override
public <T> CompletableFuture<Void> insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return completeVoidFuture();
CompletableFuture future = checkEntity("insert", true, values);
if (future != null) return future;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
if (info.isVirtualEntity()) {
if (isAysnc()) {
return insert(null, info, values).whenComplete(futureCompleteConsumer);
} else {
return CompletableFuture.runAsync(() -> insert(null, info, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
} else {
if (isAysnc()) {
return writePool.pollAsync().thenApply(conn -> insert(conn, info, values)).whenComplete(futureCompleteConsumer);
} else {
return CompletableFuture.runAsync(() -> insert(writePool.poll(), info, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
return CompletableFuture.runAsync(() -> insertCache(info, values), getExecutor());
}
if (isAsync()) return insertDB(info, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
insertCache(info, values);
}
});
return CompletableFuture.runAsync(() -> insertDB(info, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
insertCache(info, values);
}
}), getExecutor());
}
protected <T> CompletableFuture<Void> insert(final DBChannel conn, final EntityInfo<T> info, T... values) {
if (values.length == 0) return completeVoidFuture();
if (values.length > 1) { //检查对象是否都是同一个Entity类
Class clazz = null;
for (T val : values) {
if (clazz == null) {
clazz = val.getClass();
continue;
}
if (clazz != val.getClass()) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new SQLException("DataSource.insert must the same Class Entity, but diff is " + clazz + " and " + val.getClass()));
return future;
}
}
protected <T> void insertCache(final EntityInfo<T> info, T... values) {
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
for (final T value : values) {
cache.insert(value);
}
if (info.isVirtualEntity()) {
final EntityCache<T> cache = info.getCache();
if (cache != null) { //更新缓存
for (final T value : values) {
cache.insert(value);
}
if (cacheListener != null) cacheListener.insertCache(info.getType(), values);
}
return completeVoidFuture();
}
if (isAysnc()) { //异步模式
} else {
}
return completeVoidFuture();
if (cacheListener != null) cacheListener.insertCache(info.getType(), values);
}
@Override
@@ -305,18 +313,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
@Override
public <T> int delete(T... values) {
if (values.length == 0) return -1;
if (values.length > 1) { //检查对象是否都是同一个Entity类
Class clazz = null;
for (T val : values) {
if (clazz == null) {
clazz = val.getClass();
continue;
}
if (clazz != val.getClass()) {
throw new RuntimeException("DataSource.delete must the same Class Entity, but diff is " + clazz + " and " + val.getClass());
}
}
}
checkEntity("delete", false, values);
final Class<T> clazz = (Class<T>) values[0].getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
final Attribute primary = info.getPrimary();
@@ -331,20 +328,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
@Override
public <T> CompletableFuture<Integer> deleteAsync(final T... values) {
if (values.length == 0) return CompletableFuture.completedFuture(-1);
if (values.length > 1) { //检查对象是否都是同一个Entity类
Class clazz = null;
for (T val : values) {
if (clazz == null) {
clazz = val.getClass();
continue;
}
if (clazz != val.getClass()) {
CompletableFuture<Integer> future = new CompletableFuture<>();
future.completeExceptionally(new SQLException("DataSource.delete must the same Class Entity, but diff is " + clazz + " and " + val.getClass()));
return future;
}
}
}
CompletableFuture future = checkEntity("delete", true, values);
if (future != null) return future;
final Class<T> clazz = (Class<T>) values[0].getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
final Attribute primary = info.getPrimary();
@@ -360,54 +345,37 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int delete(Class<T> clazz, Serializable... ids) {
if (ids.length == 0) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
return delete(null, info, ids).join();
} else {
if (isAysnc()) {
return writePool.pollAsync().thenCompose(conn -> delete(conn, info, ids)).join();
if (info.isVirtualEntity()) return deleteCache(info, -1, ids);
return delete(info, ids).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
return delete(writePool.poll(), info, ids).join();
deleteCache(info, rs, ids);
}
}
}).join();
}
@Override
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final Serializable... ids) {
if (ids.length == 0) return CompletableFuture.completedFuture(-1);
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
if (isAysnc()) {
return delete(null, info, ids);
return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, ids), getExecutor());
}
if (isAsync()) return delete(info, ids).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, ids);
}
});
return CompletableFuture.supplyAsync(() -> delete(info, ids).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
return CompletableFuture.supplyAsync(() -> delete(null, info, ids).join(), getExecutor());
deleteCache(info, rs, ids);
}
} else {
if (isAysnc()) {
return writePool.pollAsync().thenCompose(conn -> delete(conn, info, ids));
} else {
return CompletableFuture.supplyAsync(() -> delete(writePool.poll(), info, ids).join(), getExecutor());
}
}
}
protected <T> CompletableFuture<Integer> delete(final DBChannel conn, final EntityInfo<T> info, Serializable... keys) {
if (keys.length == 0) return CompletableFuture.completedFuture(-1);
if (info.isVirtualEntity()) {
final EntityCache<T> cache = info.getCache();
if (cache == null) return CompletableFuture.completedFuture(-1);
int c = 0;
for (Serializable key : keys) {
c += cache.delete(key);
}
if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys);
return CompletableFuture.completedFuture(c);
}
//待实现
if (isAysnc()) { //异步模式
} else {
}
return CompletableFuture.completedFuture(-1);
});
}
@Override
@@ -423,46 +391,89 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
@Override
public <T> int delete(Class<T> clazz, final Flipper flipper, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
return delete(null, info, flipper, node).join();
} else {
if (isAysnc()) {
return writePool.pollAsync().thenCompose(conn -> delete(conn, info, flipper, node)).join();
if (info.isVirtualEntity()) return deleteCache(info, -1, flipper, node);
return delete(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
return delete(writePool.poll(), info, flipper, node).join();
deleteCache(info, rs, flipper, node);
}
}
}).join();
}
@Override
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final Flipper flipper, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
if (isAysnc()) {
return delete(null, info, flipper, node);
} else {
return CompletableFuture.supplyAsync(() -> delete(null, info, flipper, node).join(), getExecutor());
}
} else {
if (isAysnc()) {
return writePool.pollAsync().thenCompose(conn -> delete(conn, info, flipper, node));
} else {
return CompletableFuture.supplyAsync(() -> delete(writePool.poll(), info, flipper, node).join(), getExecutor());
}
return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor());
}
if (isAsync()) return delete(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, flipper, node);
}
});
return CompletableFuture.supplyAsync(() -> delete(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, flipper, node);
}
});
}
protected <T> CompletableFuture<Integer> delete(final DBChannel conn, final EntityInfo<T> info, final Flipper flipper, FilterNode node) {
if (info.isVirtualEntity()) {
return CompletableFuture.completedFuture(-1);
protected <T> CompletableFuture<Integer> delete(final EntityInfo<T> info, final Serializable... ids) {
if (ids.length == 1) {
String sql = "DELETE FROM " + info.getTable(ids[0]) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(ids[0]);
return deleteDB(info, null, sql);
}
//待实现
if (isAysnc()) { //异步模式
} else {
String sql = "DELETE FROM " + info.getTable(ids[0]) + " WHERE " + info.getPrimarySQLColumn() + " IN (";
for (int i = 0; i < ids.length; i++) {
if (i > 0) sql += ',';
sql += FilterNode.formatToString(ids[i]);
}
return CompletableFuture.completedFuture(-1);
sql += ")";
return deleteDB(info, null, sql);
}
protected <T> CompletableFuture<Integer> delete(final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
Map<Class, String> joinTabalis = node.getJoinTabalis();
CharSequence join = node.createSQLJoin(this, true, joinTabalis, new HashSet<>(), info);
CharSequence where = node.createSQLExpress(info, joinTabalis);
StringBuilder join1 = null;
StringBuilder join2 = null;
if (join != null) {
String joinstr = join.toString();
join1 = multisplit('[', ']', ",", new StringBuilder(), joinstr, 0);
join2 = multisplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
}
String sql = "DELETE " + ("mysql".equals(this.readPool.getDbtype()) ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1))
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " delete sql="
+ (sql + ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()))));
return deleteDB(info, flipper, sql);
}
protected <T> int deleteCache(final EntityInfo<T> info, int count, Flipper flipper, FilterNode node) {
final EntityCache<T> cache = info.getCache();
if (cache == null) return -1;
Serializable[] ids = cache.delete(flipper, node);
if (cacheListener != null) cacheListener.deleteCache(info.getType(), ids);
return count >= 0 ? count : (ids == null ? 0 : ids.length);
}
protected <T> int deleteCache(final EntityInfo<T> info, int count, Serializable... keys) {
final EntityCache<T> cache = info.getCache();
if (cache == null) return -1;
int c = 0;
for (Serializable key : keys) {
c += cache.delete(key);
}
if (cacheListener != null) cacheListener.deleteCache(info.getType(), keys);
return count >= 0 ? count : c;
}
@Override
@@ -478,6 +489,17 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return c;
}
protected static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) {
if (str == null) return sb;
int pos1 = str.indexOf(ch1, from);
if (pos1 < 0) return sb;
int pos2 = str.indexOf(ch2, from);
if (pos2 < 0) return sb;
if (sb.length() > 0) sb.append(split);
sb.append(str.substring(pos1 + 1, pos2));
return multisplit(ch1, ch2, split, sb, str, pos2 + 1);
}
//---------------------------- update ----------------------------
//------------------------- getNumberMap -------------------------
@Override
@@ -533,7 +555,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return CompletableFuture.completedFuture(map);
}
}
if (isAysnc()) return getNumberMap(info, node, columns);
if (isAsync()) return getNumberMap(info, node, columns);
return CompletableFuture.supplyAsync(() -> (Map) getNumberMap(info, node, columns).join(), getExecutor());
}
@@ -627,7 +649,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return CompletableFuture.completedFuture(cache.getNumberResult(func, defVal, column, node));
}
}
if (isAysnc()) return getNumberResult(info, entityClass, func, defVal, column, node);
if (isAsync()) return getNumberResult(info, entityClass, func, defVal, column, node);
return CompletableFuture.supplyAsync(() -> getNumberResult(info, entityClass, func, defVal, column, node).join(), getExecutor());
}
@@ -684,7 +706,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return CompletableFuture.completedFuture(cache.queryColumnMap(keyColumn, func, funcColumn, node));
}
}
if (isAysnc()) return queryColumnMap(info, keyColumn, func, funcColumn, node);
if (isAsync()) return queryColumnMap(info, keyColumn, func, funcColumn, node);
return CompletableFuture.supplyAsync(() -> (Map) queryColumnMap(info, keyColumn, func, funcColumn, node).join(), getExecutor());
}
@@ -739,7 +761,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
T rs = cache.find(selects, pk);
if (cache.isFullLoaded() || rs != null) return CompletableFuture.completedFuture(rs);
}
if (isAysnc()) return find(info, selects, pk);
if (isAsync()) return find(info, selects, pk);
return CompletableFuture.supplyAsync(() -> find(info, selects, pk).join(), getExecutor());
}
@@ -804,7 +826,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) {
return CompletableFuture.completedFuture(cache.find(selects, node));
}
if (isAysnc()) return find(info, selects, node);
if (isAsync()) return find(info, selects, node);
return CompletableFuture.supplyAsync(() -> find(info, selects, node).join(), getExecutor());
}
@@ -876,7 +898,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, pk);
if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val);
}
if (isAysnc()) return findColumn(info, column, defValue, pk);
if (isAsync()) return findColumn(info, column, defValue, pk);
return CompletableFuture.supplyAsync(() -> findColumn(info, column, defValue, pk).join(), getExecutor());
}
@@ -905,7 +927,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, node);
if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val);
}
if (isAysnc()) return findColumn(info, column, defValue, node);
if (isAsync()) return findColumn(info, column, defValue, node);
return CompletableFuture.supplyAsync(() -> findColumn(info, column, defValue, node).join(), getExecutor());
}
@@ -938,7 +960,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(pk);
if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs);
}
if (isAysnc()) return exists(info, pk);
if (isAsync()) return exists(info, pk);
return CompletableFuture.supplyAsync(() -> exists(info, pk).join(), getExecutor());
}
@@ -977,7 +999,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(node);
if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs);
}
if (isAysnc()) return exists(info, node);
if (isAsync()) return exists(info, node);
return CompletableFuture.supplyAsync(() -> exists(info, node).join(), getExecutor());
}
@@ -1510,7 +1532,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
@Override
public <T> CompletableFuture<Sheet<T>> querySheetAsync(final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
if (isAysnc()) return querySheet(true, true, clazz, selects, flipper, node);
if (isAsync()) return querySheet(true, true, clazz, selects, flipper, node);
return CompletableFuture.supplyAsync(() -> querySheet(true, true, clazz, selects, flipper, node).join(), getExecutor());
}