diff --git a/src/com/wentch/redkale/net/http/WebSocket.java b/src/com/wentch/redkale/net/http/WebSocket.java index fa583c92f..d988dd643 100644 --- a/src/com/wentch/redkale/net/http/WebSocket.java +++ b/src/com/wentch/redkale/net/http/WebSocket.java @@ -5,6 +5,7 @@ */ package com.wentch.redkale.net.http; +import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -22,7 +23,7 @@ public abstract class WebSocket { String sessionid; - long groupid; + Serializable groupid; private final Map attributes = new ConcurrentHashMap<>(); @@ -42,10 +43,18 @@ public abstract class WebSocket { send(text, true); } - private void send(String text, boolean last) { + public final void send(String text, boolean last) { send(new WebSocketPacket(text, last)); } + public final void send(byte[] data) { + send(data, true); + } + + public final void send(byte[] data, boolean last) { + send(new WebSocketPacket(data, last)); + } + @SuppressWarnings("unchecked") public final T getAttribute(String name) { return (T) attributes.get(name); @@ -59,7 +68,7 @@ public abstract class WebSocket { attributes.put(name, value); } - public final long getGroupid() { + public final Serializable getGroupid() { return groupid; } @@ -77,7 +86,6 @@ public abstract class WebSocket { } //------------------------------------------------------------------- - /** * 返回sessionid, null表示连接不合法或异常 * @@ -89,12 +97,12 @@ public abstract class WebSocket { } /** - * 返回GroupID, 负数表示异常 + * 返回GroupID, null表示异常 * * @return */ - public long createGroupid() { - return 0; + public Serializable createGroupid() { + return null; } /** diff --git a/src/com/wentch/redkale/net/http/WebSocketEngine.java b/src/com/wentch/redkale/net/http/WebSocketEngine.java index 3a498f3c3..7430d6000 100644 --- a/src/com/wentch/redkale/net/http/WebSocketEngine.java +++ b/src/com/wentch/redkale/net/http/WebSocketEngine.java @@ -5,6 +5,7 @@ */ package com.wentch.redkale.net.http; +import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -16,7 +17,7 @@ public final class WebSocketEngine { private final long engineid = Math.abs(System.nanoTime()); - private final Map containers = new ConcurrentHashMap<>(); + private final Map containers = new ConcurrentHashMap<>(); WebSocketEngine() { } @@ -37,7 +38,7 @@ public final class WebSocketEngine { if (group.isEmpty()) containers.remove(socket.groupid); } - public WebSocketGroup getWebSocketGroup(long groupid) { + public WebSocketGroup getWebSocketGroup(Serializable groupid) { return containers.get(groupid); } diff --git a/src/com/wentch/redkale/net/http/WebSocketGroup.java b/src/com/wentch/redkale/net/http/WebSocketGroup.java index e1ad5609b..88ab2f6ac 100644 --- a/src/com/wentch/redkale/net/http/WebSocketGroup.java +++ b/src/com/wentch/redkale/net/http/WebSocketGroup.java @@ -5,6 +5,7 @@ */ package com.wentch.redkale.net.http; +import java.io.*; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Stream; @@ -15,17 +16,17 @@ import java.util.stream.Stream; */ public final class WebSocketGroup { - private final long groupid; + private final Serializable groupid; private final List list = new CopyOnWriteArrayList<>(); private final Map attributes = new HashMap<>(); - WebSocketGroup(long groupid) { + WebSocketGroup(Serializable groupid) { this.groupid = groupid; } - public long getGroupid() { + public Serializable getGroupid() { return groupid; } diff --git a/src/com/wentch/redkale/net/http/WebSocketServlet.java b/src/com/wentch/redkale/net/http/WebSocketServlet.java index d82e754c2..a8a4b4a50 100644 --- a/src/com/wentch/redkale/net/http/WebSocketServlet.java +++ b/src/com/wentch/redkale/net/http/WebSocketServlet.java @@ -40,6 +40,10 @@ public abstract class WebSocketServlet extends HttpServlet { engine.close(); } + protected long getEngineid() { + return engine.getEngineid(); + } + @Override public final void execute(HttpRequest request, HttpResponse response) throws IOException { final boolean debug = logger.isLoggable(Level.FINER); @@ -80,8 +84,8 @@ public abstract class WebSocketServlet extends HttpServlet { @Override public void completed(Integer result, Void attachment) { HttpContext context = response.getContext(); - long groupid = webSocket.createGroupid(); - if (groupid < 0) { + Serializable groupid = webSocket.createGroupid(); + if (groupid == null) { if (debug) logger.finer("WebSocket connect abort, Create groupid abort"); response.finish(true); return; diff --git a/src/com/wentch/redkale/service/WebSocketNodeService.java b/src/com/wentch/redkale/service/WebSocketNodeService.java new file mode 100644 index 000000000..a9b750da7 --- /dev/null +++ b/src/com/wentch/redkale/service/WebSocketNodeService.java @@ -0,0 +1,211 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.service; + +import com.wentch.redkale.net.http.*; +import com.wentch.redkale.util.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.*; +import javax.annotation.*; + +/** + * + * @author zhangjx + */ +public class WebSocketNodeService implements Service { + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected final boolean fine = logger.isLoggable(Level.FINE); + + protected final boolean finest = logger.isLoggable(Level.FINEST); + + @Resource(name = "APP_NODE") + protected String localNodeName = ""; + + @Resource + protected HashMap nodemaps; + + //用户分布在节点上的队列信息,只保存远程节点的用户分布信息 + protected final ConcurrentHashMap> usernodes = new ConcurrentHashMap(); + + protected final ConcurrentHashMap engines = new ConcurrentHashMap(); + + @Override + public void init(AnyValue conf) { + if (fine) logger.fine(this.localNodeName + ", " + this + ", " + nodemaps); + if (this.nodemaps == null || this.nodemaps.isEmpty()) return; + new Thread() { + { + setDaemon(true); + } + + @Override + public void run() { + usernodes.putAll(queryNodes()); + } + }.start(); + } + + public final void addWebSocketEngine(WebSocketEngine engine) { + engines.put(engine.getEngineid(), engine); + } + + @RemoteOn + public Map> queryNodes() { + Map> rs = new HashMap<>(); + this.nodemaps.forEach((x, y) -> { + if (!rs.isEmpty()) return; + try { + rs.putAll(y.queryNodes()); + } catch (Exception e) { + logger.log(Level.WARNING, this.getClass().getSimpleName() + " query error (" + x + ")", e); + } + }); + return rs; + } + + public final Map> onQueryNodes() { + Map> rs = new HashMap<>(); + rs.putAll(this.usernodes); + return rs; + } + + public void connectSelf(Serializable userid) { + connect(this.localNodeName, userid); + } + + public void disconnectSelf(Serializable userid) { + if (fine) logger.fine("LocalNode " + localNodeName + " disconnect " + userid); + disconnect(this.localNodeName, userid); + } + + @RemoteOn + public void connect(String nodeid, Serializable userid) { + onConnect(nodeid, userid); + if (this.nodemaps == null) return; + this.nodemaps.forEach((x, y) -> { + try { + if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to connect (" + nodeid + "," + userid + ")"); + y.connect(nodeid, userid); + } catch (Exception e) { + logger.log(Level.WARNING, this.getClass().getSimpleName() + " connect error (" + x + ", [" + nodeid + "," + userid + "])", e); + } + }); + } + + public final void onConnect(String nodeid, Serializable userid) { + if (fine) logger.fine("LocalNode " + localNodeName + " receive onConnect (" + nodeid + "," + userid + ")"); + Set userNodelist = usernodes.get(userid); + if (userNodelist == null) { + userNodelist = new CopyOnWriteArraySet<>(); + usernodes.put(userid, userNodelist); + } + userNodelist.add(nodeid); + } + + @RemoteOn + public void disconnect(String nodeid, Serializable userid) { + onDisconnect(nodeid, userid); + if (this.nodemaps == null) return; + this.nodemaps.forEach((x, y) -> { + try { + if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to disconnect (" + nodeid + "," + userid + ")"); + y.disconnect(nodeid, userid); + } catch (Exception e) { + logger.log(Level.WARNING, this.getClass().getSimpleName() + " disconnect error (" + x + ", [" + nodeid + "," + userid + "])", e); + } + }); + } + + public final void onDisconnect(String nodeid, Serializable userid) { + if (fine) logger.fine("LocalNode " + localNodeName + " receive onDisconnect (" + nodeid + "," + userid + ")"); + Set userNodelist = usernodes.get(userid); + if (userNodelist == null) return; + userNodelist.remove(nodeid); + if (userNodelist.isEmpty()) usernodes.remove(userid); + } + + @RemoteOn + public boolean send(long engineid, Serializable groupid, String text) { + return send(engineid, groupid, text, true); + } + + public final boolean onSend(long engineid, Serializable groupid, String text) { + return onSend(engineid, groupid, text, true); + } + + @RemoteOn + public boolean send(long engineid, Serializable groupid, String text, boolean last) { + return send0(engineid, groupid, text, last); + } + + public final boolean onSend(long engineid, Serializable groupid, String text, boolean last) { + return onSend0(engineid, groupid, text, last); + } + + @RemoteOn + public boolean send(long engineid, Serializable groupid, byte[] data) { + return send(engineid, groupid, data, true); + } + + public final boolean onSend(long engineid, Serializable groupid, byte[] data) { + return onSend(engineid, groupid, data, true); + } + + @RemoteOn + public boolean send(long engineid, Serializable groupid, byte[] data, boolean last) { + return send0(engineid, groupid, data, last); + } + + public final boolean onSend(long engineid, Serializable groupid, byte[] data, boolean last) { + return onSend0(engineid, groupid, data, last); + } + + private boolean send0(long engineid, Serializable groupid, Serializable text, boolean last) { + final Set nodes = usernodes.get(groupid); + if (nodes == null) return false; + boolean rs = false; + if (nodes.contains(this.localNodeName)) rs |= onSend0(engineid, groupid, text, last); + if (nodemaps == null) return rs; + this.nodemaps.forEach((x, y) -> { + if (nodes.contains(x)) { + try { + y.send0(engineid, groupid, text, last); + if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to send message (" + engineid + "," + groupid + "," + text + ")"); + } catch (Exception e) { + onDisconnect(x, groupid); + logger.log(Level.WARNING, this.getClass().getSimpleName() + " send message error (" + x + ", [" + engineid + "," + groupid + "," + text + "])", e); + } + } + }); + return true; + } + + /** + * 消息接受者存在WebSocket并发送成功返回true, 否则返回false + * + * @param engineid + * @param groupid 接收方 + * @param text + * @return + */ + private boolean onSend0(long engineid, Serializable groupid, Serializable text, boolean last) { + WebSocketEngine webSocketEngine = engines.get(engineid); + if (webSocketEngine == null) return false; + WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid); + if (group == null || group.isEmpty()) return false; + if (text != null && text.getClass() == byte[].class) { + group.getWebSockets().forEach(x -> x.send((byte[]) text, last)); + } else { + group.getWebSockets().forEach(x -> x.send(text.toString(), last)); + } + return true; + } + +}