diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index 999b0f2a8..c203f0bb0 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -10,7 +10,7 @@ import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; +import java.util.function.*; import java.util.logging.*; import javax.annotation.Resource; import org.redkale.convert.json.*; @@ -63,6 +63,10 @@ public class CacheMemorySource extends AbstractService impleme protected final ConcurrentHashMap> container = new ConcurrentHashMap<>(); + protected final BiConsumer futureCompleteConsumer = (r, t) -> { + if (t != null) logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t); + }; + @RpcRemote protected CacheSource remoteSource; @@ -432,7 +436,7 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture refreshAsync(final String key, final int expireSeconds) { - return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()); + return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); } protected void set(CacheEntryType cacheType, String key, Object value) { @@ -469,19 +473,19 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture setAsync(String key, V value) { - return CompletableFuture.runAsync(() -> set(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> set(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture setStringAsync(String key, String value) { - return CompletableFuture.runAsync(() -> setString(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> setString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture setLongAsync(String key, long value) { - return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } protected void set(CacheEntryType cacheType, int expireSeconds, String key, Object value) { @@ -518,19 +522,19 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture setAsync(int expireSeconds, String key, V value) { - return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()); + return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture setStringAsync(int expireSeconds, String key, String value) { - return CompletableFuture.runAsync(() -> setString(expireSeconds, key, value), getExecutor()); + return CompletableFuture.runAsync(() -> setString(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture setLongAsync(int expireSeconds, String key, long value) { - return CompletableFuture.runAsync(() -> setLong(expireSeconds, key, value), getExecutor()); + return CompletableFuture.runAsync(() -> setLong(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -545,7 +549,7 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture setExpireSecondsAsync(final String key, final int expireSeconds) { - return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor()); + return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -564,7 +568,7 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture incrAsync(final String key) { - return CompletableFuture.supplyAsync(() -> incr(key), getExecutor()); + return CompletableFuture.supplyAsync(() -> incr(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -586,7 +590,7 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture incrAsync(final String key, long num) { - return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor()); + return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -598,7 +602,7 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture decrAsync(final String key) { - return CompletableFuture.supplyAsync(() -> decr(key), getExecutor()); + return CompletableFuture.supplyAsync(() -> decr(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -610,13 +614,13 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture decrAsync(final String key, long num) { - return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor()); + return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture removeAsync(final String key) { - return CompletableFuture.runAsync(() -> remove(key), getExecutor()); + return CompletableFuture.runAsync(() -> remove(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -764,19 +768,19 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture appendListItemAsync(final String key, final V value) { - return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture appendStringListItemAsync(final String key, final String value) { - return CompletableFuture.runAsync(() -> appendStringListItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> appendStringListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture appendLongListItemAsync(final String key, final long value) { - return CompletableFuture.runAsync(() -> appendLongListItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> appendLongListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -809,19 +813,19 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture removeListItemAsync(final String key, final V value) { - return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture removeStringListItemAsync(final String key, final String value) { - return CompletableFuture.runAsync(() -> removeStringListItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> removeStringListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture removeLongListItemAsync(final String key, final long value) { - return CompletableFuture.runAsync(() -> removeLongListItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> removeLongListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } protected void appendSetItem(CacheEntryType cacheType, String key, Object value) { @@ -859,19 +863,19 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture appendSetItemAsync(final String key, final V value) { - return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture appendStringSetItemAsync(final String key, final String value) { - return CompletableFuture.runAsync(() -> appendStringSetItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> appendStringSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture appendLongSetItemAsync(final String key, final long value) { - return CompletableFuture.runAsync(() -> appendLongSetItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> appendLongSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -904,19 +908,19 @@ public class CacheMemorySource extends AbstractService impleme @Override @RpcMultiRun public CompletableFuture removeSetItemAsync(final String key, final V value) { - return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture removeStringSetItemAsync(final String key, final String value) { - return CompletableFuture.runAsync(() -> removeStringSetItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> removeStringSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @RpcMultiRun public CompletableFuture removeLongSetItemAsync(final String key, final long value) { - return CompletableFuture.runAsync(() -> removeLongSetItem(key, value), getExecutor()); + return CompletableFuture.runAsync(() -> removeLongSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override diff --git a/src/org/redkale/source/DataJdbcSource.java b/src/org/redkale/source/DataJdbcSource.java index 437ee5ac9..e71bb390c 100644 --- a/src/org/redkale/source/DataJdbcSource.java +++ b/src/org/redkale/source/DataJdbcSource.java @@ -54,6 +54,10 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Resource(name = "$") protected DataCacheListener cacheListener; + protected final BiConsumer futureCompleteConsumer = (r, t) -> { + if (t != null) logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t); + }; + protected final BiFunction fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true); public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) { @@ -240,7 +244,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) { - return CompletableFuture.runAsync(() -> insert(values), getExecutor()); + return CompletableFuture.runAsync(() -> insert(values), getExecutor()).whenComplete(futureCompleteConsumer); } protected void insert(final Connection conn, final EntityInfo info, T... values) { @@ -422,7 +426,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture deleteAsync(final T... values) { - return CompletableFuture.supplyAsync(() -> delete(values), getExecutor()); + return CompletableFuture.supplyAsync(() -> delete(values), getExecutor()).whenComplete(futureCompleteConsumer); } protected int delete(final Connection conn, final EntityInfo info, T... values) { @@ -452,7 +456,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture deleteAsync(final Class clazz, final Serializable... ids) { - return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor()); + return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor()).whenComplete(futureCompleteConsumer); } protected int delete(final Connection conn, final EntityInfo info, Serializable... keys) { @@ -505,7 +509,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture deleteAsync(final Class clazz, final FilterNode node) { - return CompletableFuture.supplyAsync(() -> delete(clazz, node), getExecutor()); + return CompletableFuture.supplyAsync(() -> delete(clazz, node), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -524,7 +528,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture deleteAsync(final Class clazz, final Flipper flipper, FilterNode node) { - return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor()); + return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor()).whenComplete(futureCompleteConsumer); } protected int delete(final Connection conn, final EntityInfo info, final Flipper flipper, final FilterNode node) { @@ -615,7 +619,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateAsync(final T... values) { - return CompletableFuture.supplyAsync(() -> update(values), getExecutor()); + return CompletableFuture.supplyAsync(() -> update(values), getExecutor()).whenComplete(futureCompleteConsumer); } protected int update(final Connection conn, final EntityInfo info, T... values) { @@ -715,7 +719,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final Class clazz, final Serializable id, final String column, final Serializable value) { - return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor()).whenComplete(futureCompleteConsumer); } protected int updateColumn(Connection conn, final EntityInfo info, Serializable id, String column, final Serializable value) { @@ -782,7 +786,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final Class clazz, final String column, final Serializable value, final FilterNode node) { - return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor()).whenComplete(futureCompleteConsumer); } protected int updateColumn(Connection conn, final EntityInfo info, String column, final Serializable value, FilterNode node) { @@ -864,7 +868,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final Class clazz, final Serializable id, final ColumnValue... values) { - return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor()).whenComplete(futureCompleteConsumer); } protected int updateColumn(final Connection conn, final EntityInfo info, final Serializable id, final ColumnValue... values) { @@ -950,7 +954,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final Class clazz, final FilterNode node, final ColumnValue... values) { - return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, values), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, values), getExecutor()).whenComplete(futureCompleteConsumer); } /** @@ -980,7 +984,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final Class clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) { - return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor()).whenComplete(futureCompleteConsumer); } protected int updateColumn(final Connection conn, final EntityInfo info, final FilterNode node, final Flipper flipper, final ColumnValue... values) { @@ -1060,7 +1064,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final T bean, final String... columns) { - return CompletableFuture.supplyAsync(() -> updateColumn(bean, columns), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(bean, columns), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1070,7 +1074,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final T bean, final FilterNode node, final String... columns) { - return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, columns), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, columns), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1089,7 +1093,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final T bean, final SelectColumn selects) { - return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor()).whenComplete(futureCompleteConsumer); } protected int updateColumns(final Connection conn, final EntityInfo info, final T bean, final SelectColumn selects) { @@ -1165,7 +1169,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC @Override public CompletableFuture updateColumnAsync(final T bean, final FilterNode node, final SelectColumn selects) { - return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor()); + return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor()).whenComplete(futureCompleteConsumer); } protected int updateColumns(final Connection conn, final EntityInfo info, final T bean, final FilterNode node, final SelectColumn selects) {