diff --git a/src/com/wentch/redkale/boot/Application.java b/src/com/wentch/redkale/boot/Application.java index 959b35f6f..3a74c54d9 100644 --- a/src/com/wentch/redkale/boot/Application.java +++ b/src/com/wentch/redkale/boot/Application.java @@ -39,25 +39,25 @@ import org.w3c.dom.*; */ public final class Application { - //进程启动的时间, 类型: long + //当前进程启动的时间, 类型: long public static final String RESNAME_TIME = "APP_TIME"; - //本地进程的根目录, 类型:String + //当前进程的根目录, 类型:String public static final String RESNAME_HOME = "APP_HOME"; - //本地节点的名称, 类型:String + //当前进程节点的名称, 类型:String public static final String RESNAME_NODE = "APP_NODE"; - //本地节点的所属组, 类型:String、Map>、Map>> + //当前进程节点的所属组, 类型:String、Map>、Map>> public static final String RESNAME_GROUP = "APP_GROUP"; - //本地节点的所属组所有节点名, 类型:Set 、List>包含自身节点名 + //当前进程节点的所属组所有节点名, 类型:Set 、List>包含自身节点名 public static final String RESNAME_INGROUP = "APP_INGROUP"; - //除本地节点的所属组外其他所有组的所有节点名, 类型:Map>、Map>> + //除当前进程节点的所属组外其他所有组的所有节点名, 类型:Map>、Map>> public static final String RESNAME_OUTGROUP = "APP_OUTGROUP"; - //本地节点的IP地址, 类型:InetAddress、String + //当前进程节点的IP地址, 类型:InetAddress、String public static final String RESNAME_ADDR = "APP_ADDR"; //application.xml 文件中resources节点的内容, 类型: AnyValue diff --git a/src/com/wentch/redkale/net/http/WebSocket.java b/src/com/wentch/redkale/net/http/WebSocket.java index d988dd643..f4e5bb340 100644 --- a/src/com/wentch/redkale/net/http/WebSocket.java +++ b/src/com/wentch/redkale/net/http/WebSocket.java @@ -5,6 +5,7 @@ */ package com.wentch.redkale.net.http; +import com.wentch.redkale.service.*; import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -21,6 +22,8 @@ public abstract class WebSocket { WebSocketGroup group; + WebSocketNodeService nodeService; + String sessionid; Serializable groupid; @@ -55,6 +58,33 @@ public abstract class WebSocket { send(new WebSocketPacket(data, last)); } + //---------------------------------------------------------------- + public final boolean sendMessage(Serializable groupid, String text) { + return sendMessage(groupid, text, true); + } + + public final boolean sendMessage(Serializable groupid, byte[] data) { + return sendMessage(groupid, data, true); + } + + public final boolean sendMessage(Serializable groupid, String text, boolean last) { + if (nodeService == null) return false; + if (groupid == this.groupid) { + return nodeService.onSend(this.engine.getEngineid(), groupid, text, last); + } else { + return nodeService.send(this.engine.getEngineid(), groupid, text, last); + } + } + + public final boolean sendMessage(Serializable groupid, byte[] data, boolean last) { + if (nodeService == null) return false; + if (groupid == this.groupid) { + return nodeService.onSend(this.engine.getEngineid(), groupid, data, last); + } else { + return nodeService.send(this.engine.getEngineid(), groupid, data, last); + } + } + @SuppressWarnings("unchecked") public final T getAttribute(String name) { return (T) attributes.get(name); diff --git a/src/com/wentch/redkale/net/http/WebSocketEngine.java b/src/com/wentch/redkale/net/http/WebSocketEngine.java index 7430d6000..cbf433be8 100644 --- a/src/com/wentch/redkale/net/http/WebSocketEngine.java +++ b/src/com/wentch/redkale/net/http/WebSocketEngine.java @@ -15,13 +15,17 @@ import java.util.concurrent.*; */ public final class WebSocketEngine { - private final long engineid = Math.abs(System.nanoTime()); + private String engineid; private final Map containers = new ConcurrentHashMap<>(); WebSocketEngine() { } + void setEngineid(String engineid) { + this.engineid = engineid; + } + void add(WebSocket socket) { WebSocketGroup group = containers.get(socket.groupid); if (group == null) { @@ -45,7 +49,7 @@ public final class WebSocketEngine { void close() { } - public long getEngineid() { + public String getEngineid() { return engineid; } } diff --git a/src/com/wentch/redkale/net/http/WebSocketRunner.java b/src/com/wentch/redkale/net/http/WebSocketRunner.java index c76fcb5c5..1c0484267 100644 --- a/src/com/wentch/redkale/net/http/WebSocketRunner.java +++ b/src/com/wentch/redkale/net/http/WebSocketRunner.java @@ -8,6 +8,7 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.AsyncConnection; import com.wentch.redkale.net.Context; import com.wentch.redkale.net.http.WebSocketPacket.PacketType; +import com.wentch.redkale.service.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.security.SecureRandom; @@ -29,6 +30,8 @@ public class WebSocketRunner implements Runnable { protected final Context context; + protected final WebSocketNodeService nodeService; + private ByteBuffer readBuffer; private ByteBuffer writeBuffer; @@ -41,8 +44,9 @@ public class WebSocketRunner implements Runnable { private final BlockingQueue queue = new ArrayBlockingQueue(1024); - public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel) { + public WebSocketRunner(Context context, WebSocketNodeService nodeService, WebSocket webSocket, AsyncConnection channel) { this.context = context; + this.nodeService = nodeService; this.engine = webSocket.engine; this.webSocket = webSocket; this.channel = channel; @@ -57,6 +61,7 @@ public class WebSocketRunner implements Runnable { public void run() { final boolean debug = this.coder.debugable; try { + if (nodeService != null) nodeService.connectSelf(webSocket.groupid); webSocket.onConnected(); channel.setReadTimeoutSecond(300); //读取超时5分钟 if (channel.isOpen()) { @@ -176,6 +181,10 @@ public class WebSocketRunner implements Runnable { readBuffer = null; writeBuffer = null; engine.remove(webSocket); + if (nodeService != null) { + WebSocketGroup group = webSocket.getWebSocketGroup(); + if (group == null || group.isEmpty()) nodeService.disconnectSelf(webSocket.groupid); + } webSocket.onClose(0, null); } diff --git a/src/com/wentch/redkale/net/http/WebSocketServlet.java b/src/com/wentch/redkale/net/http/WebSocketServlet.java index a8a4b4a50..926cc0154 100644 --- a/src/com/wentch/redkale/net/http/WebSocketServlet.java +++ b/src/com/wentch/redkale/net/http/WebSocketServlet.java @@ -6,6 +6,7 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.*; +import com.wentch.redkale.service.*; import com.wentch.redkale.util.*; import java.io.*; import java.nio.*; @@ -13,6 +14,7 @@ import java.nio.channels.*; import java.security.*; import java.util.*; import java.util.logging.*; +import javax.annotation.*; /** * @@ -32,18 +34,26 @@ public abstract class WebSocketServlet extends HttpServlet { } } + @Resource + protected WebSocketNodeService nodeService; + protected final WebSocketEngine engine = new WebSocketEngine(); + @Override + public void init(Context context, AnyValue conf) { + engine.setEngineid(context.getServerAddress().getPort() + "-" + Arrays.toString(this.getClass().getAnnotation(WebServlet.class).value())); + if (nodeService != null) { + nodeService.addWebSocketEngine(engine); + nodeService.initUserNodes(); + } + } + @Override public void destroy(Context context, AnyValue conf) { super.destroy(context, conf); engine.close(); } - protected long getEngineid() { - return engine.getEngineid(); - } - @Override public final void execute(HttpRequest request, HttpResponse response) throws IOException { final boolean debug = logger.isLoggable(Level.FINER); @@ -62,6 +72,7 @@ public abstract class WebSocketServlet extends HttpServlet { } final WebSocket webSocket = this.createWebSocket(); webSocket.engine = engine; + webSocket.nodeService = nodeService; String sessionid = webSocket.onOpen(request); if (sessionid == null) { if (debug) logger.finer("WebSocket connect abort, Not found sessionid. request=" + request); @@ -92,7 +103,7 @@ public abstract class WebSocketServlet extends HttpServlet { } webSocket.groupid = groupid; engine.add(webSocket); - context.submit(new WebSocketRunner(context, webSocket, response.removeChannel())); + context.submit(new WebSocketRunner(context, nodeService, webSocket, response.removeChannel())); response.finish(true); } diff --git a/src/com/wentch/redkale/service/WebSocketNodeService.java b/src/com/wentch/redkale/service/WebSocketNodeService.java index a9b750da7..0b8cee7a6 100644 --- a/src/com/wentch/redkale/service/WebSocketNodeService.java +++ b/src/com/wentch/redkale/service/WebSocketNodeService.java @@ -34,11 +34,14 @@ public class WebSocketNodeService implements Service { //用户分布在节点上的队列信息,只保存远程节点的用户分布信息 protected final ConcurrentHashMap> usernodes = new ConcurrentHashMap(); - protected final ConcurrentHashMap engines = new ConcurrentHashMap(); + protected final ConcurrentHashMap engines = new ConcurrentHashMap(); @Override public void init(AnyValue conf) { if (fine) logger.fine(this.localNodeName + ", " + this + ", " + nodemaps); + } + + public void initUserNodes() { if (this.nodemaps == null || this.nodemaps.isEmpty()) return; new Thread() { { @@ -132,42 +135,42 @@ public class WebSocketNodeService implements Service { } @RemoteOn - public boolean send(long engineid, Serializable groupid, String text) { + public boolean send(String engineid, Serializable groupid, String text) { return send(engineid, groupid, text, true); } - public final boolean onSend(long engineid, Serializable groupid, String text) { + public final boolean onSend(String engineid, Serializable groupid, String text) { return onSend(engineid, groupid, text, true); } @RemoteOn - public boolean send(long engineid, Serializable groupid, String text, boolean last) { + public boolean send(String engineid, Serializable groupid, String text, boolean last) { return send0(engineid, groupid, text, last); } - public final boolean onSend(long engineid, Serializable groupid, String text, boolean last) { + public final boolean onSend(String engineid, Serializable groupid, String text, boolean last) { return onSend0(engineid, groupid, text, last); } @RemoteOn - public boolean send(long engineid, Serializable groupid, byte[] data) { + public boolean send(String engineid, Serializable groupid, byte[] data) { return send(engineid, groupid, data, true); } - public final boolean onSend(long engineid, Serializable groupid, byte[] data) { + public final boolean onSend(String engineid, Serializable groupid, byte[] data) { return onSend(engineid, groupid, data, true); } @RemoteOn - public boolean send(long engineid, Serializable groupid, byte[] data, boolean last) { + public boolean send(String engineid, Serializable groupid, byte[] data, boolean last) { return send0(engineid, groupid, data, last); } - public final boolean onSend(long engineid, Serializable groupid, byte[] data, boolean last) { + public final boolean onSend(String engineid, Serializable groupid, byte[] data, boolean last) { return onSend0(engineid, groupid, data, last); } - private boolean send0(long engineid, Serializable groupid, Serializable text, boolean last) { + private boolean send0(String engineid, Serializable groupid, Serializable text, boolean last) { final Set nodes = usernodes.get(groupid); if (nodes == null) return false; boolean rs = false; @@ -195,7 +198,7 @@ public class WebSocketNodeService implements Service { * @param text * @return */ - private boolean onSend0(long engineid, Serializable groupid, Serializable text, boolean last) { + private boolean onSend0(String engineid, Serializable groupid, Serializable text, boolean last) { WebSocketEngine webSocketEngine = engines.get(engineid); if (webSocketEngine == null) return false; WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid);