This commit is contained in:
Redkale
2017-05-22 16:00:25 +08:00
parent 9e6840f5cb
commit a29cc94f32
2 changed files with 30 additions and 21 deletions

View File

@@ -130,19 +130,8 @@ public abstract class WebSocketNode {
CompletableFuture<Map<InetSocketAddress, List<String>>> future = null; CompletableFuture<Map<InetSocketAddress, List<String>>> future = null;
for (final InetSocketAddress nodeAddress : addrs) { for (final InetSocketAddress nodeAddress : addrs) {
CompletableFuture<Map<InetSocketAddress, List<String>>> mapFuture = getWebSocketAddresses(nodeAddress, groupid) CompletableFuture<Map<InetSocketAddress, List<String>>> mapFuture = getWebSocketAddresses(nodeAddress, groupid)
.thenCompose((List<String> list) -> { .thenCompose((List<String> list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list)));
Map<InetSocketAddress, List<String>> map = new HashMap<>(); future = future == null ? mapFuture : future.thenCombine(mapFuture, (a, b) -> Utility.merge(a, b));
map.put(nodeAddress, list);
return CompletableFuture.completedFuture(map);
});
if (future == null) {
future = mapFuture;
} else {
future = future.thenCombine(mapFuture, (a, b) -> {
a.putAll(b);
return a;
});
}
} }
return future == null ? CompletableFuture.completedFuture(new HashMap<>()) : future; return future == null ? CompletableFuture.completedFuture(new HashMap<>()) : future;
}); });
@@ -203,11 +192,8 @@ public abstract class WebSocketNode {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) { for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue; if (addr == null || addr.equals(localSncpAddress)) continue;
if (future == null) { future = future == null ? remoteNode.sendMessage(addr, groupid, recent, message, last)
future = remoteNode.sendMessage(addr, groupid, recent, message, last); : future.thenCombine(remoteNode.sendMessage(addr, groupid, recent, message, last), (a, b) -> a | b);
} else {
future = future.thenCombine(remoteNode.sendMessage(addr, groupid, recent, message, last), (a, b) -> a | b);
}
} }
return future == null ? CompletableFuture.completedFuture(0) : future; return future == null ? CompletableFuture.completedFuture(0) : future;
}); });

View File

@@ -108,15 +108,38 @@ public final class Utility {
* 将多个key:value对应值组合成一个Mapitems长度必须是偶数, 参数个数若是奇数的话,最后一个会被忽略 * 将多个key:value对应值组合成一个Mapitems长度必须是偶数, 参数个数若是奇数的话,最后一个会被忽略
* 类似 JDK9中的 Map.of 方法 * 类似 JDK9中的 Map.of 方法
* *
* @param <K> 泛型
* @param <V> 泛型
* @param items 键值对 * @param items 键值对
* *
* @return Map * @return Map
*/ */
public static Map<Object, Object> ofMap(Object... items) { public static <K, V> Map<K, V> ofMap(Object... items) {
HashMap<Object, Object> map = new LinkedHashMap<>(); HashMap<K, V> map = new LinkedHashMap<>();
int len = items.length / 2; int len = items.length / 2;
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
map.put(items[i * 2], items[i * 2 + 1]); map.put((K) items[i * 2], (V) items[i * 2 + 1]);
}
return map;
}
/**
* 将多个Map合并成一个Map
*
* @param <K> 泛型
* @param <V> 泛型
* @param maps Map
*
* @return Map
*/
public static <K, V> Map<K, V> merge(Map<K, V>... maps) {
Map<K, V> map = null;
for (Map<K, V> m : maps) {
if (map == null) {
map = m;
} else if (m != null) {
map.putAll(m);
}
} }
return map; return map;
} }