This commit is contained in:
Redkale
2017-05-22 14:41:03 +08:00
parent 95a2b752af
commit 52d559ea4a
2 changed files with 38 additions and 43 deletions

View File

@@ -58,7 +58,7 @@ public abstract class WebSocketNode {
this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid()));
} }
protected abstract CompletableFuture<List<String>> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last); protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last);
@@ -79,17 +79,18 @@ public abstract class WebSocketNode {
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
/** /**
* 获取目标地址 * 获取目标地址 <br>
* 该方法只能被内部调用
* *
* @param targetAddress * @param targetAddress
* @param groupid * @param groupid
* *
* @return * @return
*/ */
protected CompletableFuture<List<String>> remoteOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { protected CompletableFuture<List<String>> remoteWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) {
if (remoteNode == null) return CompletableFuture.completedFuture(null); if (remoteNode == null) return CompletableFuture.completedFuture(null);
try { try {
return remoteNode.getOnlineRemoteAddresses(targetAddress, groupid); return remoteNode.getWebSocketAddresses(targetAddress, groupid);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e); logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e);
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
@@ -104,7 +105,7 @@ public abstract class WebSocketNode {
* *
* @return 地址列表 * @return 地址列表
*/ */
public CompletableFuture<Collection<InetSocketAddress>> getOnlineNodes(final Serializable groupid) { public CompletableFuture<Collection<InetSocketAddress>> getSncpAddresses(final Serializable groupid) {
if (this.sncpAddressNodes != null) return this.sncpAddressNodes.getCollectionAsync(groupid); if (this.sncpAddressNodes != null) return this.sncpAddressNodes.getCollectionAsync(groupid);
List<InetSocketAddress> rs = new ArrayList<>(); List<InetSocketAddress> rs = new ArrayList<>();
if (this.localSncpAddress != null) rs.add(this.localSncpAddress); if (this.localSncpAddress != null) rs.add(this.localSncpAddress);
@@ -121,9 +122,9 @@ public abstract class WebSocketNode {
* @return 地址集合 * @return 地址集合
*/ */
//异步待优化 //异步待优化
public CompletableFuture<Map<InetSocketAddress, List<String>>> getOnlineRemoteAddress(final Serializable groupid) { public CompletableFuture<Map<InetSocketAddress, List<String>>> getSncpNodeWebSocketAddresses(final Serializable groupid) {
final CompletableFuture<Map<InetSocketAddress, List<String>>> rs = new CompletableFuture<>(); final CompletableFuture<Map<InetSocketAddress, List<String>>> rs = new CompletableFuture<>();
CompletableFuture<Collection<InetSocketAddress>> nodesFuture = getOnlineNodes(groupid); CompletableFuture<Collection<InetSocketAddress>> nodesFuture = getSncpAddresses(groupid);
if (nodesFuture == null) return CompletableFuture.completedFuture(null); if (nodesFuture == null) return CompletableFuture.completedFuture(null);
nodesFuture.whenComplete((nodes, e) -> { nodesFuture.whenComplete((nodes, e) -> {
if (e != null) { if (e != null) {
@@ -131,7 +132,7 @@ public abstract class WebSocketNode {
} else { } else {
final Map<InetSocketAddress, List<String>> map = new HashMap(); final Map<InetSocketAddress, List<String>> map = new HashMap();
for (final InetSocketAddress nodeAddress : nodes) { for (final InetSocketAddress nodeAddress : nodes) {
List<String> list = getOnlineRemoteAddresses(nodeAddress, groupid).join(); List<String> list = getWebSocketAddresses(nodeAddress, groupid).join();
if (list == null) list = new ArrayList(); if (list == null) list = new ArrayList();
map.put(nodeAddress, list); map.put(nodeAddress, list);
} }
@@ -173,44 +174,38 @@ public abstract class WebSocketNode {
* *
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
//异步待优化 //最近连接发送逻辑还没有理清楚
public final CompletableFuture<Integer> sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) { public final CompletableFuture<Integer> sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) {
return CompletableFuture.supplyAsync(() -> { if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine");
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); CompletableFuture<Integer> localFuture = null;
int rscode = RETCODE_GROUP_EMPTY; final WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid);
WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid); if (group != null) localFuture = group.send(recent, message, last);
if (group != null) rscode = group.send(recent, message, last).join(); //临时, 要改 if (recent && localFuture != null) { //已经给最近连接发送的消息
if (recent && rscode == 0) { //已经给最近连接发送的消息 if (finest) logger.finest("websocket want send recent message success");
if (finest) logger.finest("websocket want send recent message success"); return localFuture;
return rscode; }
} if (this.sncpAddressNodes == null || this.remoteNode == null) {
if (this.sncpAddressNodes == null || this.remoteNode == null) { if (finest) logger.finest("websocket remote node is null");
if (finest) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点
//没有CacheSource就不会有分布式节点 return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
return rscode; }
} //远程节点发送消息
//-----------------------发送远程的----------------------------- CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpAddressNodes.getCollectionAsync(groupid);
Collection<InetSocketAddress> addrs = sncpAddressNodes.getCollectionAsync(groupid).join(); CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs);
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点)所以正常情况下addrs不会为空。 if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
if (recent) { CompletableFuture<Integer> future = null;
InetSocketAddress one = null; for (InetSocketAddress addr : addrs) {
for (InetSocketAddress addr : addrs) { if (addr.equals(localSncpAddress)) continue;
one = addr; if (future == null) {
} future = remoteNode.sendMessage(addr, groupid, recent, message, last);
rscode = remoteNode.sendMessage(one, groupid, recent, message, last).join();
} else { } else {
for (InetSocketAddress addr : addrs) { future = future.thenCombine(remoteNode.sendMessage(addr, groupid, recent, message, last), (a, b) -> a | b);
if (!addr.equals(localSncpAddress)) {
rscode |= remoteNode.sendMessage(addr, groupid, recent, message, last).join();
}
}
} }
} else {
rscode = RETCODE_GROUP_EMPTY;
} }
return rscode; return future == null ? CompletableFuture.completedFuture(0) : future;
}); });
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
} }
} }

View File

@@ -35,13 +35,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
} }
@Override @Override
public CompletableFuture<List<String>> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { public CompletableFuture<List<String>> getWebSocketAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid); if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteWebSocketAddresses(targetAddress, groupid);
if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
final List<String> rs = new ArrayList<>(); final List<String> rs = new ArrayList<>();
final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid);
if (group != null) group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr())); if (group != null) group.getWebSockets().forEach(x -> rs.add(x.getRemoteAddr()));
return rs; return rs;
}); });
} }