From 62783d07651eb50ae9e3472046764259da420743 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sun, 6 May 2018 16:38:41 +0800 Subject: [PATCH] --- src/org/redkale/source/DataJdbcSource.java | 31 ++++++++++------ src/org/redkale/source/DataSource.java | 6 ++- src/org/redkale/source/DataSqlJdbcSource.java | 12 ++++-- src/org/redkale/source/DataSqlSource.java | 37 +++++++++---------- 4 files changed, 50 insertions(+), 36 deletions(-) diff --git a/src/org/redkale/source/DataJdbcSource.java b/src/org/redkale/source/DataJdbcSource.java index cc24a1180..05ee9630a 100644 --- a/src/org/redkale/source/DataJdbcSource.java +++ b/src/org/redkale/source/DataJdbcSource.java @@ -212,10 +212,12 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC * * @param Entity类泛型 * @param values Entity对象 + * + * @return 影响的记录条数 */ @Override - public void insert(@RpcCall(DataCallArrayAttribute.class) T... values) { - if (values.length == 0) return; + public int insert(@RpcCall(DataCallArrayAttribute.class) T... values) { + if (values.length == 0) return 0; if (values.length > 1) { //检查对象是否都是同一个Entity类 Class clazz = null; for (T val : values) { @@ -230,24 +232,24 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC } final EntityInfo info = loadEntityInfo((Class) values[0].getClass()); if (info.isVirtualEntity()) { - insert(null, info, values); - return; + return insert(null, info, values); } Connection conn = createWriteSQLConnection(); try { - insert(conn, info, values); + return insert(conn, info, values); } finally { closeSQLConnection(conn); } } @Override - public CompletableFuture insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) { - return CompletableFuture.runAsync(() -> insert(values), getExecutor()).whenComplete(futureCompleteConsumer); + public CompletableFuture insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) { + return CompletableFuture.supplyAsync(() -> insert(values), getExecutor()).whenComplete(futureCompleteConsumer); } - protected void insert(final Connection conn, final EntityInfo info, T... values) { - if (values.length == 0) return; + protected int insert(final Connection conn, final EntityInfo info, T... values) { + if (values.length == 0) return 0; + int c = -1; try { if (!info.isVirtualEntity()) { final String sql = info.getInsertPrepareSQL(values[0]); @@ -295,7 +297,12 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC } prestmt.close(); prestmt = createInsertPreparedStatement(conn, sql, info, values); - prestmt.executeBatch(); + int[] cs = prestmt.executeBatch(); + int c1 = 0; + for (int cc : cs) { + c1 += cc; + } + c = c1; } if (info.autoGenerated) { //由数据库自动生成主键值 ResultSet set = prestmt.getGeneratedKeys(); @@ -336,12 +343,14 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC } //打印结束 } final EntityCache cache = info.getCache(); + int c2 = 0; if (cache != null) { //更新缓存 for (final T value : values) { - cache.insert(value); + c2 += cache.insert(value); } if (cacheListener != null) cacheListener.insertCache(info.getType(), values); } + return c >= 0 ? c : c2; } catch (SQLException e) { throw new RuntimeException(e); } diff --git a/src/org/redkale/source/DataSource.java b/src/org/redkale/source/DataSource.java index 817ae16bb..94c0f0af3 100644 --- a/src/org/redkale/source/DataSource.java +++ b/src/org/redkale/source/DataSource.java @@ -37,8 +37,10 @@ public interface DataSource { * * @param 泛型 * @param values Entity对象 + * + * @return 影响的记录条数 */ - public void insert(final T... values); + public int insert(final T... values); /** * 新增记录, 多对象必须是同一个Entity类且必须在同一张表中
@@ -48,7 +50,7 @@ public interface DataSource { * * @return CompletableFuture */ - public CompletableFuture insertAsync(final T... values); + public CompletableFuture insertAsync(final T... values); //-------------------------deleteAsync-------------------------- /** diff --git a/src/org/redkale/source/DataSqlJdbcSource.java b/src/org/redkale/source/DataSqlJdbcSource.java index 6aada943f..ea3224d14 100644 --- a/src/org/redkale/source/DataSqlJdbcSource.java +++ b/src/org/redkale/source/DataSqlJdbcSource.java @@ -49,9 +49,10 @@ public class DataSqlJdbcSource extends DataSqlSource { } @Override - protected CompletableFuture insertDB(EntityInfo info, T... values) { + protected CompletableFuture insertDB(EntityInfo info, T... values) { Connection conn = null; try { + int c = 0; conn = writePool.poll(); final String sql = info.getInsertPrepareSQL(values[0]); final Class primaryType = info.getPrimary().type(); @@ -98,7 +99,12 @@ public class DataSqlJdbcSource extends DataSqlSource { } prestmt.close(); prestmt = createInsertPreparedStatement(conn, sql, info, values); - prestmt.executeBatch(); + int[] cs = prestmt.executeBatch(); + int c1 = 0; + for (int cc : cs) { + c1 += cc; + } + c = c1; } if (info.autoGenerated) { //由数据库自动生成主键值 ResultSet set = prestmt.getGeneratedKeys(); @@ -137,7 +143,7 @@ public class DataSqlJdbcSource extends DataSqlSource { logger.finest(info.getType().getSimpleName() + " insert sql=" + sb.toString().replaceAll("(\r|\n)", "\\n")); } } //打印结束 - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(c); } catch (SQLException e) { CompletableFuture future = new CompletableFuture(); future.completeExceptionally(e); diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index 22dbb2588..7a09bc904 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -113,7 +113,7 @@ public abstract class DataSqlSource extends AbstractService implement protected abstract PoolSource createPoolSource(DataSource source, String rwtype, Properties prop); //插入纪录 - protected abstract CompletableFuture insertDB(final EntityInfo info, T... values); + protected abstract CompletableFuture insertDB(final EntityInfo info, T... values); //删除记录 protected abstract CompletableFuture deleteDB(final EntityInfo info, Flipper flipper, final String sql); @@ -197,10 +197,6 @@ public abstract class DataSqlSource extends AbstractService implement return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader); } - protected CompletableFuture completeVoidFuture() { - return isAsync() ? CompletableFuture.completedFuture(null) : null; - } - /** * 将entity的对象全部加载到Cache中去,如果clazz没有被@javax.persistence.Cacheable注解则不做任何事 * @@ -241,17 +237,16 @@ public abstract class DataSqlSource extends AbstractService implement * * @param Entity类泛型 * @param values Entity对象 + * + * @return 影响的记录条数 */ @Override - public void insert(@RpcCall(DataCallArrayAttribute.class) T... values) { - if (values.length == 0) return; + public int insert(@RpcCall(DataCallArrayAttribute.class) T... values) { + if (values.length == 0) return 0; final EntityInfo info = loadEntityInfo((Class) values[0].getClass()); checkEntity("insert", false, values); - if (info.isVirtualEntity()) { - insertCache(info, values); - return; - } - insertDB(info, values).whenComplete((rs, t) -> { + if (info.isVirtualEntity()) return insertCache(info, values); + return insertDB(info, values).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); } else { @@ -261,13 +256,13 @@ public abstract class DataSqlSource extends AbstractService implement } @Override - public CompletableFuture insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) { - if (values.length == 0) return completeVoidFuture(); + public CompletableFuture insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) { + if (values.length == 0) return CompletableFuture.completedFuture(0); CompletableFuture future = checkEntity("insert", true, values); if (future != null) return future; final EntityInfo info = loadEntityInfo((Class) values[0].getClass()); if (info.isVirtualEntity()) { - return CompletableFuture.runAsync(() -> insertCache(info, values), getExecutor()); + return CompletableFuture.supplyAsync(() -> insertCache(info, values), getExecutor()); } if (isAsync()) return insertDB(info, values).whenComplete((rs, t) -> { if (t != null) { @@ -276,22 +271,24 @@ public abstract class DataSqlSource extends AbstractService implement insertCache(info, values); } }); - return CompletableFuture.runAsync(() -> insertDB(info, values).whenComplete((rs, t) -> { + return CompletableFuture.supplyAsync(() -> insertDB(info, values).join(), getExecutor()).whenComplete((rs, t) -> { if (t != null) { futureCompleteConsumer.accept(rs, t); } else { insertCache(info, values); } - }), getExecutor()); + }); } - protected void insertCache(final EntityInfo info, T... values) { + protected int insertCache(final EntityInfo info, T... values) { final EntityCache cache = info.getCache(); - if (cache == null) return; + if (cache == null) return 0; + int c = 0; for (final T value : values) { - cache.insert(value); + c += cache.insert(value); } if (cacheListener != null) cacheListener.insertCache(info.getType(), values); + return c; } @Override