This commit is contained in:
Redkale
2019-11-07 09:55:16 +08:00
parent 0e14b60f12
commit 52eb7dbc0c

View File

@@ -466,7 +466,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int delete(Class<T> clazz, final Flipper flipper, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return deleteCache(info, -1, flipper, node);
return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
return this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -481,14 +481,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor());
}
if (isAsync()) return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (isAsync()) return this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
deleteCache(info, rs, flipper, node);
}
});
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.deleteCompose(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> this.deleteCompose(info, flipper, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -542,7 +542,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int clearTable(Class<T> clazz, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return clearTableCache(info, node);
return DataSqlSource.this.clearTableCompose(info, node).whenComplete((rs, t) -> {
return this.clearTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -562,14 +562,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> clearTableCache(info, node), getExecutor());
}
if (isAsync()) return DataSqlSource.this.clearTableCompose(info, node).whenComplete((rs, t) -> {
if (isAsync()) return this.clearTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
clearTableCache(info, node);
}
});
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.clearTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> this.clearTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -594,7 +594,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int dropTable(Class<T> clazz, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return dropTableCache(info, node);
return DataSqlSource.this.dropTableCompose(info, node).whenComplete((rs, t) -> {
return this.dropTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -614,14 +614,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> dropTableCache(info, node), getExecutor());
}
if (isAsync()) return DataSqlSource.this.dropTableCompose(info, node).whenComplete((rs, t) -> {
if (isAsync()) return this.dropTableCompose(info, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
dropTableCache(info, node);
}
});
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.dropTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> this.dropTableCompose(info, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -812,7 +812,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
public <T> int updateColumn(Class<T> clazz, String column, Serializable colval, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, column, colval, node);
return DataSqlSource.this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> {
return this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -827,14 +827,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, column, colval, node), getExecutor());
}
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> {
if (isAsync()) return this.updateColumnCompose(info, column, colval, node).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, column, colval, node);
}
});
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, column, colval, node).join(), getExecutor()).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, column, colval, node).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -886,7 +886,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (values == null || values.length < 1) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, pk, values);
return DataSqlSource.this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> {
return this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -902,14 +902,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, pk, values), getExecutor());
}
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> {
if (isAsync()) return this.updateColumnCompose(info, pk, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, pk, values);
}
});
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, pk, values).join(), getExecutor()).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, pk, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -967,7 +967,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (values == null || values.length < 1) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, node, flipper, values);
return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
return this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -983,14 +983,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, node, flipper, values), getExecutor());
}
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (isAsync()) return this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, node, flipper, values);
}
});
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, node, flipper, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -1063,7 +1063,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Class<T> clazz = (Class) entity.getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, false, entity, null, selects);
return DataSqlSource.this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> {
return this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -1080,14 +1080,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, false, entity, null, selects), getExecutor());
}
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> {
if (isAsync()) return this.updateColumnCompose(info, false, entity, null, selects).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, false, entity, null, selects);
}
});
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, false, entity, null, selects).join(), getExecutor()).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, false, entity, null, selects).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -1102,7 +1102,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Class<T> clazz = (Class) entity.getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
if (isOnlyCache(info)) return updateCache(info, -1, true, entity, node, selects);
return DataSqlSource.this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> {
return this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -1119,14 +1119,14 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (isOnlyCache(info)) {
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, true, entity, node, selects), getExecutor());
}
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> {
if (isAsync()) return this.updateColumnCompose(info, true, entity, node, selects).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
updateCache(info, rs, true, entity, node, selects);
}
});
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.updateColumnCompose(info, true, entity, node, selects).join(), getExecutor()).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> this.updateColumnCompose(info, true, entity, node, selects).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -1601,7 +1601,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) return cache.find(selects, node);
return DataSqlSource.this.findCompose(info, selects, node).join();
return this.findCompose(info, selects, node).join();
}
@Override
@@ -1611,8 +1611,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (cache != null && cache.isFullLoaded() && (node == null || node.isCacheUseable(this))) {
return CompletableFuture.completedFuture(cache.find(selects, node));
}
if (isAsync()) return DataSqlSource.this.findCompose(info, selects, node);
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.findCompose(info, selects, node).join(), getExecutor());
if (isAsync()) return this.findCompose(info, selects, node);
return CompletableFuture.supplyAsync(() -> this.findCompose(info, selects, node).join(), getExecutor());
}
protected <T> CompletableFuture<T> findCompose(final EntityInfo<T> info, final SelectColumn selects, final FilterNode node) {
@@ -1701,7 +1701,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, node);
if (cache.isFullLoaded() || val != null) return val;
}
return DataSqlSource.this.findColumnCompose(info, column, defValue, node).join();
return this.findColumnCompose(info, column, defValue, node).join();
}
@Override
@@ -1712,8 +1712,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
Serializable val = cache.findColumn(column, defValue, node);
if (cache.isFullLoaded() || val != null) return CompletableFuture.completedFuture(val);
}
if (isAsync()) return DataSqlSource.this.findColumnCompose(info, column, defValue, node);
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.findColumnCompose(info, column, defValue, node).join(), getExecutor());
if (isAsync()) return this.findColumnCompose(info, column, defValue, node);
return CompletableFuture.supplyAsync(() -> this.findColumnCompose(info, column, defValue, node).join(), getExecutor());
}
protected <T> CompletableFuture<Serializable> findColumnCompose(final EntityInfo<T> info, String column, final Serializable defValue, final FilterNode node) {
@@ -1773,7 +1773,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(node);
if (rs || cache.isFullLoaded()) return rs;
}
return DataSqlSource.this.existsCompose(info, node).join();
return this.existsCompose(info, node).join();
}
@Override
@@ -1784,8 +1784,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
boolean rs = cache.exists(node);
if (rs || cache.isFullLoaded()) return CompletableFuture.completedFuture(rs);
}
if (isAsync()) return DataSqlSource.this.existsCompose(info, node);
return CompletableFuture.supplyAsync(() -> DataSqlSource.this.existsCompose(info, node).join(), getExecutor());
if (isAsync()) return this.existsCompose(info, node);
return CompletableFuture.supplyAsync(() -> this.existsCompose(info, node).join(), getExecutor());
}
protected <T> CompletableFuture<Boolean> existsCompose(final EntityInfo<T> info, FilterNode node) {