diff --git a/src/com/wentch/redkale/net/http/WebSocket.java b/src/com/wentch/redkale/net/http/WebSocket.java index 836d68e1b..f1d4528e3 100644 --- a/src/com/wentch/redkale/net/http/WebSocket.java +++ b/src/com/wentch/redkale/net/http/WebSocket.java @@ -34,6 +34,26 @@ import java.util.concurrent.ConcurrentHashMap; */ public abstract class WebSocket { + //消息不合法 + public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2 + + //ws已经关闭 + public static final int RETCODE_WSOCKET_CLOSED = 1 << 2; //4 + + //socket的buffer不合法 + public static final int RETCODE_ILLEGALBUFFER = 1 << 3; //8 + + //ws发送消息异常 + public static final int RETCODE_SENDEXCEPTION = 1 << 4; //16 + + public static final int RETCODE_ENGINE_NULL = 1 << 5; //32 + + public static final int RETCODE_NODESERVICE_NULL = 1 << 6; //64 + + public static final int RETCODE_GROUP_EMPTY = 1 << 7; //128 + + public static final int RETCODE_WSOFFLINE = 1 << 8; //256 + WebSocketRunner runner; WebSocketEngine engine; @@ -56,9 +76,11 @@ public abstract class WebSocket { * 发送消息体, 包含二进制/文本 *

* @param packet + * @return */ - public final void send(WebSocketPacket packet) { - if (this.runner != null) this.runner.sendMessage(packet); + public final int send(WebSocketPacket packet) { + if (this.runner != null) return this.runner.sendMessage(packet); + return RETCODE_WSOCKET_CLOSED; } /** @@ -72,9 +94,10 @@ public abstract class WebSocket { * 发送单一的文本消息 *

* @param text 不可为空 + * @return */ - public final void send(String text) { - send(text, true); + public final int send(String text) { + return send(text, true); } /** @@ -82,18 +105,20 @@ public abstract class WebSocket { *

* @param text 不可为空 * @param last 是否最后一条 + * @return */ - public final void send(String text, boolean last) { - send(new WebSocketPacket(text, last)); + public final int send(String text, boolean last) { + return send(new WebSocketPacket(text, last)); } /** * 发送单一的二进制消息 *

* @param data + * @return */ - public final void send(byte[] data) { - send(data, true); + public final int send(byte[] data) { + return send(data, true); } /** @@ -101,9 +126,10 @@ public abstract class WebSocket { *

* @param data 不可为空 * @param last 是否最后一条 + * @return */ - public final void send(byte[] data, boolean last) { - send(new WebSocketPacket(data, last)); + public final int send(byte[] data, boolean last) { + return send(new WebSocketPacket(data, last)); } /** @@ -111,9 +137,10 @@ public abstract class WebSocket { *

* @param message 不可为空, 只能是String或者byte[] * @param last 是否最后一条 + * @return */ - public final void send(Serializable message, boolean last) { - send(new WebSocketPacket(message, last)); + public final int send(Serializable message, boolean last) { + return send(new WebSocketPacket(message, last)); } //---------------------------------------------------------------- @@ -210,12 +237,12 @@ public abstract class WebSocket { } private int sendMessage(Serializable groupid, boolean recent, String text, boolean last) { - if (node == null) return WebSocketNode.RETCODE_NODESERVICE_NULL; + if (node == null) return RETCODE_NODESERVICE_NULL; return node.sendMessage(groupid, recent, text, last); } private int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { - if (node == null) return WebSocketNode.RETCODE_NODESERVICE_NULL; + if (node == null) return RETCODE_NODESERVICE_NULL; return node.sendMessage(groupid, recent, data, last); } diff --git a/src/com/wentch/redkale/net/http/WebSocketGroup.java b/src/com/wentch/redkale/net/http/WebSocketGroup.java index 9d508998c..ffdd1d143 100644 --- a/src/com/wentch/redkale/net/http/WebSocketGroup.java +++ b/src/com/wentch/redkale/net/http/WebSocketGroup.java @@ -76,24 +76,29 @@ public final class WebSocketGroup { attributes.put(name, value); } - public final void send(boolean recent, Serializable message, boolean last) { + public final int send(boolean recent, Serializable message, boolean last) { if (recent) { - recentWebSocket.send(message, last); + return recentWebSocket.send(message, last); } else { - list.forEach(x -> x.send(message, last)); + return sendEach(message, last); } } - public final void sendEach(Serializable message, boolean last) { - list.forEach(x -> x.send(message, last)); + public final int sendEach(Serializable message, boolean last) { + int rs = 0; + for (WebSocket s : list) { + rs |= s.send(message, last); + } + return rs; } - public final void sendRecent(Serializable message, boolean last) { - recentWebSocket.send(message, last); + public final int sendRecent(Serializable message, boolean last) { + return recentWebSocket.send(message, last); } @Override public String toString() { return "{groupid: " + groupid + ", list.size: " + (list == null ? -1 : list.size()) + "}"; } + } diff --git a/src/com/wentch/redkale/net/http/WebSocketNode.java b/src/com/wentch/redkale/net/http/WebSocketNode.java index 4437661bd..04205f11f 100644 --- a/src/com/wentch/redkale/net/http/WebSocketNode.java +++ b/src/com/wentch/redkale/net/http/WebSocketNode.java @@ -5,6 +5,7 @@ */ package com.wentch.redkale.net.http; +import static com.wentch.redkale.net.http.WebSocket.*; import com.wentch.redkale.net.sncp.*; import com.wentch.redkale.util.*; import java.io.*; @@ -20,14 +21,6 @@ import javax.annotation.*; */ public abstract class WebSocketNode { - public static final int RETCODE_ENGINE_NULL = 5001; - - public static final int RETCODE_NODESERVICE_NULL = 5002; - - public static final int RETCODE_GROUP_EMPTY = 5005; - - public static final int RETCODE_WSOFFLINE = 5011; - protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final boolean finest = logger.isLoggable(Level.FINEST); @@ -39,7 +32,7 @@ public abstract class WebSocketNode { protected WebSocketNode remoteNode; //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合 - protected final ConcurrentHashMap> dataNodes = new ConcurrentHashMap(); + protected final ConcurrentHashMap> dataNodes = new ConcurrentHashMap(); //存放所有用户分布在节点上的队列信息,Set 为 engineid 的集合 protected final ConcurrentHashMap> localNodes = new ConcurrentHashMap(); @@ -56,7 +49,7 @@ public abstract class WebSocketNode { @Override public void run() { try { - Map> map = remoteNode.getDataNodes(); + Map> map = remoteNode.getDataNodes(); if (map != null) dataNodes.putAll(map); } catch (Exception e) { logger.log(Level.INFO, WebSocketNode.class.getSimpleName() + "(" + localSncpAddress + ") not load data nodes ", e); @@ -75,7 +68,7 @@ public abstract class WebSocketNode { }); } - public Map> getDataNodes() { + public Map> getDataNodes() { return dataNodes; } @@ -125,16 +118,24 @@ public abstract class WebSocketNode { rscode = RETCODE_GROUP_EMPTY; break; } - group.send(recent, message, last); + rscode = group.send(recent, message, last); } } } if ((recent && rscode == 0) || remoteNode == null) return rscode; - Set addrs = dataNodes.get(groupid); - if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点 - for (InetSocketAddress addr : addrs) { - if (!addr.equals(localSncpAddress)) { - remoteNode.sendMessage(addr, groupid, recent, message, last); + LinkedHashSet addrs = dataNodes.get(groupid); + if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点 + if (recent) { + InetSocketAddress one = null; + for (InetSocketAddress addr : addrs) { + one = addr; + } + rscode = remoteNode.sendMessage(one, groupid, recent, message, last); + } else { + for (InetSocketAddress addr : addrs) { + if (!addr.equals(localSncpAddress)) { + rscode |= remoteNode.sendMessage(addr, groupid, recent, message, last); + } } } } else { diff --git a/src/com/wentch/redkale/net/http/WebSocketRunner.java b/src/com/wentch/redkale/net/http/WebSocketRunner.java index f5f66b62d..69e7aed2f 100644 --- a/src/com/wentch/redkale/net/http/WebSocketRunner.java +++ b/src/com/wentch/redkale/net/http/WebSocketRunner.java @@ -7,6 +7,7 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.AsyncConnection; import com.wentch.redkale.net.Context; +import static com.wentch.redkale.net.http.WebSocket.*; import com.wentch.redkale.net.http.WebSocketPacket.PacketType; import java.nio.ByteBuffer; import java.nio.channels.*; @@ -147,18 +148,18 @@ public class WebSocketRunner implements Runnable { } } - public void sendMessage(WebSocketPacket packet) { - if (packet == null || closed) return; - + public int sendMessage(WebSocketPacket packet) { + if (packet == null) return RETCODE_SEND_ILLPACKET; + if (closed) return RETCODE_WSOCKET_CLOSED; final boolean debug = this.coder.debugable; //System.out.println("推送消息"); final byte[] bytes = coder.encode(packet); if (debug) context.getLogger().log(Level.FINEST, "send web socket message's length = " + bytes.length); if (writing.getAndSet(true)) { queue.add(bytes); - return; + return 0; } - if (writeBuffer == null) return; + if (writeBuffer == null) return RETCODE_ILLEGALBUFFER; ByteBuffer sendBuffer = null; if (bytes.length <= writeBuffer.capacity()) { writeBuffer.clear(); @@ -211,9 +212,11 @@ public class WebSocketRunner implements Runnable { } } }); + return 0; } catch (Exception t) { writing.set(false); context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel", t); + return RETCODE_SENDEXCEPTION; } } diff --git a/src/com/wentch/redkale/service/WebSocketNodeService.java b/src/com/wentch/redkale/service/WebSocketNodeService.java index 8f4a67687..94b7c3e7f 100644 --- a/src/com/wentch/redkale/service/WebSocketNodeService.java +++ b/src/com/wentch/redkale/service/WebSocketNodeService.java @@ -6,12 +6,12 @@ package com.wentch.redkale.service; import com.wentch.redkale.net.http.*; +import static com.wentch.redkale.net.http.WebSocket.*; import com.wentch.redkale.net.sncp.*; import com.wentch.redkale.util.*; import java.io.*; import java.net.*; import java.util.*; -import java.util.concurrent.*; /** * @@ -53,9 +53,9 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override @MultiRun public void connect(Serializable groupid, InetSocketAddress addr) { - Set addrs = dataNodes.get(groupid); + LinkedHashSet addrs = dataNodes.get(groupid); if (addrs == null) { - addrs = new CopyOnWriteArraySet<>(); + addrs = new LinkedHashSet<>(); dataNodes.put(groupid, addrs); } addrs.add(addr); @@ -67,7 +67,9 @@ public class WebSocketNodeService extends WebSocketNode implements Service { public void disconnect(Serializable groupid, InetSocketAddress addr) { Set addrs = dataNodes.get(groupid); if (addrs == null) return; - addrs.remove(addr); + synchronized (addrs) { + addrs.remove(addr); + } if (addrs.isEmpty()) dataNodes.remove(groupid); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr); }