This commit is contained in:
Redkale
2018-05-02 20:28:03 +08:00
parent 13bd467152
commit fb3dd6049d

View File

@@ -8,6 +8,7 @@ package org.redkale.source;
import java.io.Serializable;
import java.net.URL;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -276,7 +277,88 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
@Override
public <T> int delete(T... values) {
if (values.length == 0) return -1;
return -1;
if (values.length > 1) { //检查对象是否都是同一个Entity类
Class clazz = null;
for (T val : values) {
if (clazz == null) {
clazz = val.getClass();
continue;
}
if (clazz != val.getClass()) {
throw new RuntimeException("DataSource.delete must the same Class Entity, but diff is " + clazz + " and " + val.getClass());
}
}
}
final Class<T> clazz = (Class<T>) values[0].getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
final Attribute primary = info.getPrimary();
Serializable[] ids = new Serializable[values.length];
int i = 0;
for (final T value : values) {
ids[i++] = (Serializable) primary.get(value);
}
return delete(clazz, ids);
}
@Override
public <T> CompletableFuture<Integer> deleteAsync(final T... values) {
if (values.length == 0) return CompletableFuture.completedFuture(-1);
if (values.length > 1) { //检查对象是否都是同一个Entity类
Class clazz = null;
CompletableFuture<Integer> future = new CompletableFuture<>();
for (T val : values) {
if (clazz == null) {
clazz = val.getClass();
continue;
}
if (clazz != val.getClass()) {
future.completeExceptionally(new SQLException("DataSource.delete must the same Class Entity, but diff is " + clazz + " and " + val.getClass()));
return future;
}
}
}
final Class<T> clazz = (Class<T>) values[0].getClass();
final EntityInfo<T> info = loadEntityInfo(clazz);
final Attribute primary = info.getPrimary();
Serializable[] ids = new Serializable[values.length];
int i = 0;
for (final T value : values) {
ids[i++] = (Serializable) primary.get(value);
}
return deleteAsync(clazz, ids);
}
@Override
public <T> int delete(Class<T> clazz, Serializable... ids) {
if (ids.length == 0) return -1;
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
return delete(null, info, ids).join();
} else {
if (isAysnc()) {
return writePool.pollAsync().thenCompose(conn -> delete(conn, info, ids)).join();
} else {
return delete(writePool.poll(), info, ids).join();
}
}
}
@Override
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final Serializable... ids) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
if (isAysnc()) {
return delete(null, info, ids);
} else {
return CompletableFuture.supplyAsync(() -> delete(null, info, ids).join(), getExecutor());
}
} else {
if (isAysnc()) {
return writePool.pollAsync().thenCompose(conn -> delete(conn, info, ids));
} else {
return CompletableFuture.supplyAsync(() -> delete(writePool.poll(), info, ids).join(), getExecutor());
}
}
}
protected <T> CompletableFuture<Integer> delete(final DBChannel conn, final EntityInfo<T> info, Serializable... keys) {
@@ -300,6 +382,75 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return CompletableFuture.completedFuture(-1);
}
@Override
public <T> int delete(Class<T> clazz, FilterNode node) {
return delete(clazz, (Flipper) null, node);
}
@Override
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final FilterNode node) {
return deleteAsync(clazz, (Flipper) null, node);
}
@Override
public <T> int delete(Class<T> clazz, final Flipper flipper, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
return delete(null, info, flipper, node).join();
} else {
if (isAysnc()) {
return writePool.pollAsync().thenCompose(conn -> delete(conn, info, flipper, node)).join();
} else {
return delete(writePool.poll(), info, flipper, node).join();
}
}
}
@Override
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final Flipper flipper, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
if (info.isVirtualEntity()) {
if (isAysnc()) {
return delete(null, info, flipper, node);
} else {
return CompletableFuture.supplyAsync(() -> delete(null, info, flipper, node).join(), getExecutor());
}
} else {
if (isAysnc()) {
return writePool.pollAsync().thenCompose(conn -> delete(conn, info, flipper, node));
} else {
return CompletableFuture.supplyAsync(() -> delete(writePool.poll(), info, flipper, node).join(), getExecutor());
}
}
}
protected <T> CompletableFuture<Integer> delete(final DBChannel conn, final EntityInfo<T> info, final Flipper flipper, FilterNode node) {
if (info.isVirtualEntity()) {
return CompletableFuture.completedFuture(-1);
}
//待实现
if (isAysnc()) { //异步模式
} else {
}
return CompletableFuture.completedFuture(-1);
}
@Override
public <T> int deleteCache(Class<T> clazz, Serializable... ids) {
if (ids.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache == null) return -1;
int c = 0;
for (Serializable id : ids) {
c += cache.delete(id);
}
return c;
}
//----------------------------- update -----------------------------
//----------------------------- find -----------------------------
/**
* 根据主键获取对象