Source增加部分CompletableFuture异常写入日志的功能

This commit is contained in:
Redkale
2018-04-28 09:04:08 +08:00
parent a8e9822381
commit fcd3258a73
2 changed files with 49 additions and 41 deletions

View File

@@ -10,7 +10,7 @@ import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.*;
import java.util.logging.*;
import javax.annotation.Resource;
import org.redkale.convert.json.*;
@@ -63,6 +63,10 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
protected final ConcurrentHashMap<String, CacheEntry<Object>> container = new ConcurrentHashMap<>();
protected final BiConsumer futureCompleteConsumer = (r, t) -> {
if (t != null) logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t);
};
@RpcRemote
protected CacheSource<V> remoteSource;
@@ -432,7 +436,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Void> refreshAsync(final String key, final int expireSeconds) {
return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor());
return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected void set(CacheEntryType cacheType, String key, Object value) {
@@ -469,19 +473,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Void> setAsync(String key, V value) {
return CompletableFuture.runAsync(() -> set(key, value), getExecutor());
return CompletableFuture.runAsync(() -> set(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> setStringAsync(String key, String value) {
return CompletableFuture.runAsync(() -> setString(key, value), getExecutor());
return CompletableFuture.runAsync(() -> setString(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> setLongAsync(String key, long value) {
return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor());
return CompletableFuture.runAsync(() -> setLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected void set(CacheEntryType cacheType, int expireSeconds, String key, Object value) {
@@ -518,19 +522,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Void> setAsync(int expireSeconds, String key, V value) {
return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor());
return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> setStringAsync(int expireSeconds, String key, String value) {
return CompletableFuture.runAsync(() -> setString(expireSeconds, key, value), getExecutor());
return CompletableFuture.runAsync(() -> setString(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> setLongAsync(int expireSeconds, String key, long value) {
return CompletableFuture.runAsync(() -> setLong(expireSeconds, key, value), getExecutor());
return CompletableFuture.runAsync(() -> setLong(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -545,7 +549,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Void> setExpireSecondsAsync(final String key, final int expireSeconds) {
return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor());
return CompletableFuture.runAsync(() -> setExpireSeconds(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -564,7 +568,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Long> incrAsync(final String key) {
return CompletableFuture.supplyAsync(() -> incr(key), getExecutor());
return CompletableFuture.supplyAsync(() -> incr(key), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -586,7 +590,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Long> incrAsync(final String key, long num) {
return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor());
return CompletableFuture.supplyAsync(() -> incr(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -598,7 +602,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Long> decrAsync(final String key) {
return CompletableFuture.supplyAsync(() -> decr(key), getExecutor());
return CompletableFuture.supplyAsync(() -> decr(key), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -610,13 +614,13 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Long> decrAsync(final String key, long num) {
return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor());
return CompletableFuture.supplyAsync(() -> decr(key, num), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> removeAsync(final String key) {
return CompletableFuture.runAsync(() -> remove(key), getExecutor());
return CompletableFuture.runAsync(() -> remove(key), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -764,19 +768,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Void> appendListItemAsync(final String key, final V value) {
return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> appendStringListItemAsync(final String key, final String value) {
return CompletableFuture.runAsync(() -> appendStringListItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> appendStringListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> appendLongListItemAsync(final String key, final long value) {
return CompletableFuture.runAsync(() -> appendLongListItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> appendLongListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -809,19 +813,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Void> removeListItemAsync(final String key, final V value) {
return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> removeStringListItemAsync(final String key, final String value) {
return CompletableFuture.runAsync(() -> removeStringListItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> removeStringListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> removeLongListItemAsync(final String key, final long value) {
return CompletableFuture.runAsync(() -> removeLongListItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> removeLongListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected void appendSetItem(CacheEntryType cacheType, String key, Object value) {
@@ -859,19 +863,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Void> appendSetItemAsync(final String key, final V value) {
return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value) {
return CompletableFuture.runAsync(() -> appendStringSetItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> appendStringSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> appendLongSetItemAsync(final String key, final long value) {
return CompletableFuture.runAsync(() -> appendLongSetItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> appendLongSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -904,19 +908,19 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
@Override
@RpcMultiRun
public CompletableFuture<Void> removeSetItemAsync(final String key, final V value) {
return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> removeStringSetItemAsync(final String key, final String value) {
return CompletableFuture.runAsync(() -> removeStringSetItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> removeStringSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> removeLongSetItemAsync(final String key, final long value) {
return CompletableFuture.runAsync(() -> removeLongSetItem(key, value), getExecutor());
return CompletableFuture.runAsync(() -> removeLongSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override

View File

@@ -54,6 +54,10 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Resource(name = "$")
protected DataCacheListener cacheListener;
protected final BiConsumer futureCompleteConsumer = (r, t) -> {
if (t != null) logger.log(Level.SEVERE, "CompletableFuture complete error", (Throwable) t);
};
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
@@ -240,7 +244,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Void> insertAsync(@RpcCall(DataCallArrayAttribute.class) T... values) {
return CompletableFuture.runAsync(() -> insert(values), getExecutor());
return CompletableFuture.runAsync(() -> insert(values), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
@@ -422,7 +426,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> deleteAsync(final T... values) {
return CompletableFuture.supplyAsync(() -> delete(values), getExecutor());
return CompletableFuture.supplyAsync(() -> delete(values), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int delete(final Connection conn, final EntityInfo<T> info, T... values) {
@@ -452,7 +456,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final Serializable... ids) {
return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor());
return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int delete(final Connection conn, final EntityInfo<T> info, Serializable... keys) {
@@ -505,7 +509,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final FilterNode node) {
return CompletableFuture.supplyAsync(() -> delete(clazz, node), getExecutor());
return CompletableFuture.supplyAsync(() -> delete(clazz, node), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -524,7 +528,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final Flipper flipper, FilterNode node) {
return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor());
return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int delete(final Connection conn, final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
@@ -615,7 +619,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateAsync(final T... values) {
return CompletableFuture.supplyAsync(() -> update(values), getExecutor());
return CompletableFuture.supplyAsync(() -> update(values), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int update(final Connection conn, final EntityInfo<T> info, T... values) {
@@ -715,7 +719,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final Serializable id, final String column, final Serializable value) {
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int updateColumn(Connection conn, final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
@@ -782,7 +786,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final String column, final Serializable value, final FilterNode node) {
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int updateColumn(Connection conn, final EntityInfo<T> info, String column, final Serializable value, FilterNode node) {
@@ -864,7 +868,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final Serializable id, final ColumnValue... values) {
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
@@ -950,7 +954,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final FilterNode node, final ColumnValue... values) {
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, values), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
/**
@@ -980,7 +984,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
@@ -1060,7 +1064,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final String... columns) {
return CompletableFuture.supplyAsync(() -> updateColumn(bean, columns), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(bean, columns), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -1070,7 +1074,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final FilterNode node, final String... columns) {
return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, columns), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, columns), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@@ -1089,7 +1093,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final SelectColumn selects) {
return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final SelectColumn selects) {
@@ -1165,7 +1169,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
@Override
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final FilterNode node, final SelectColumn selects) {
return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor());
return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final FilterNode node, final SelectColumn selects) {