This commit is contained in:
Redkale
2018-05-06 16:38:41 +08:00
parent 7116f33a3a
commit 62783d0765
4 changed files with 50 additions and 36 deletions

View File

@@ -212,10 +212,12 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
*
* @param <T> Entity类泛型
* @param values Entity对象
*
* @return 影响的记录条数
*/
@Override
public <T> void insert(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return;
public <T> int insert(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return 0;
if (values.length > 1) { //检查对象是否都是同一个Entity类
Class clazz = null;
for (T val : values) {
@@ -230,24 +232,24 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
}
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
if (info.isVirtualEntity()) {
insert(null, info, values);
return;
return insert(null, info, values);
}
Connection conn = createWriteSQLConnection();
try {
insert(conn, info, values);
return insert(conn, info, values);
} finally {
closeSQLConnection(conn);
}
}
@Override
public <T> CompletableFuture<Void> insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) {
return CompletableFuture.runAsync(() -> insert(values), getExecutor()).whenComplete(futureCompleteConsumer);
public <T> CompletableFuture<Integer> insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) {
return CompletableFuture.supplyAsync(() -> insert(values), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
if (values.length == 0) return;
protected <T> int insert(final Connection conn, final EntityInfo<T> info, T... values) {
if (values.length == 0) return 0;
int c = -1;
try {
if (!info.isVirtualEntity()) {
final String sql = info.getInsertPrepareSQL(values[0]);
@@ -295,7 +297,12 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
}
prestmt.close();
prestmt = createInsertPreparedStatement(conn, sql, info, values);
prestmt.executeBatch();
int[] cs = prestmt.executeBatch();
int c1 = 0;
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
if (info.autoGenerated) { //由数据库自动生成主键值
ResultSet set = prestmt.getGeneratedKeys();
@@ -336,12 +343,14 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
} //打印结束
}
final EntityCache<T> cache = info.getCache();
int c2 = 0;
if (cache != null) { //更新缓存
for (final T value : values) {
cache.insert(value);
c2 += cache.insert(value);
}
if (cacheListener != null) cacheListener.insertCache(info.getType(), values);
}
return c >= 0 ? c : c2;
} catch (SQLException e) {
throw new RuntimeException(e);
}

View File

@@ -37,8 +37,10 @@ public interface DataSource {
*
* @param <T> 泛型
* @param values Entity对象
*
* @return 影响的记录条数
*/
public <T> void insert(final T... values);
public <T> int insert(final T... values);
/**
* 新增记录, 多对象必须是同一个Entity类且必须在同一张表中 <br>
@@ -48,7 +50,7 @@ public interface DataSource {
*
* @return CompletableFuture
*/
public <T> CompletableFuture<Void> insertAsync(final T... values);
public <T> CompletableFuture<Integer> insertAsync(final T... values);
//-------------------------deleteAsync--------------------------
/**

View File

@@ -49,9 +49,10 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
}
@Override
protected <T> CompletableFuture<Void> insertDB(EntityInfo<T> info, T... values) {
protected <T> CompletableFuture<Integer> insertDB(EntityInfo<T> info, T... values) {
Connection conn = null;
try {
int c = 0;
conn = writePool.poll();
final String sql = info.getInsertPrepareSQL(values[0]);
final Class primaryType = info.getPrimary().type();
@@ -98,7 +99,12 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
}
prestmt.close();
prestmt = createInsertPreparedStatement(conn, sql, info, values);
prestmt.executeBatch();
int[] cs = prestmt.executeBatch();
int c1 = 0;
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
if (info.autoGenerated) { //由数据库自动生成主键值
ResultSet set = prestmt.getGeneratedKeys();
@@ -137,7 +143,7 @@ public class DataSqlJdbcSource extends DataSqlSource<Connection> {
logger.finest(info.getType().getSimpleName() + " insert sql=" + sb.toString().replaceAll("(\r|\n)", "\\n"));
}
} //打印结束
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);

View File

@@ -113,7 +113,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, Properties prop);
//插入纪录
protected abstract <T> CompletableFuture<Void> insertDB(final EntityInfo<T> info, T... values);
protected abstract <T> CompletableFuture<Integer> insertDB(final EntityInfo<T> info, T... values);
//删除记录
protected abstract <T> CompletableFuture<Integer> deleteDB(final EntityInfo<T> info, Flipper flipper, final String sql);
@@ -197,10 +197,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader);
}
protected CompletableFuture<Void> completeVoidFuture() {
return isAsync() ? CompletableFuture.completedFuture(null) : null;
}
/**
* 将entity的对象全部加载到Cache中去如果clazz没有被@javax.persistence.Cacheable注解则不做任何事
*
@@ -241,17 +237,16 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
*
* @param <T> Entity类泛型
* @param values Entity对象
*
* @return 影响的记录条数
*/
@Override
public <T> void insert(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return;
public <T> int insert(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
checkEntity("insert", false, values);
if (info.isVirtualEntity()) {
insertCache(info, values);
return;
}
insertDB(info, values).whenComplete((rs, t) -> {
if (info.isVirtualEntity()) return insertCache(info, values);
return insertDB(info, values).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
@@ -261,13 +256,13 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}
@Override
public <T> CompletableFuture<Void> insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return completeVoidFuture();
public <T> CompletableFuture<Integer> insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) {
if (values.length == 0) return CompletableFuture.completedFuture(0);
CompletableFuture future = checkEntity("insert", true, values);
if (future != null) return future;
final EntityInfo<T> info = loadEntityInfo((Class<T>) values[0].getClass());
if (info.isVirtualEntity()) {
return CompletableFuture.runAsync(() -> insertCache(info, values), getExecutor());
return CompletableFuture.supplyAsync(() -> insertCache(info, values), getExecutor());
}
if (isAsync()) return insertDB(info, values).whenComplete((rs, t) -> {
if (t != null) {
@@ -276,22 +271,24 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
insertCache(info, values);
}
});
return CompletableFuture.runAsync(() -> insertDB(info, values).whenComplete((rs, t) -> {
return CompletableFuture.supplyAsync(() -> insertDB(info, values).join(), getExecutor()).whenComplete((rs, t) -> {
if (t != null) {
futureCompleteConsumer.accept(rs, t);
} else {
insertCache(info, values);
}
}), getExecutor());
});
}
protected <T> void insertCache(final EntityInfo<T> info, T... values) {
protected <T> int insertCache(final EntityInfo<T> info, T... values) {
final EntityCache<T> cache = info.getCache();
if (cache == null) return;
if (cache == null) return 0;
int c = 0;
for (final T value : values) {
cache.insert(value);
c += cache.insert(value);
}
if (cacheListener != null) cacheListener.insertCache(info.getType(), values);
return c;
}
@Override