This commit is contained in:
地平线
2015-06-23 09:34:23 +08:00
parent 69d9cb0edf
commit 73539c217f
6 changed files with 83 additions and 26 deletions

View File

@@ -39,25 +39,25 @@ import org.w3c.dom.*;
*/ */
public final class Application { public final class Application {
//进程启动的时间, 类型: long //当前进程启动的时间, 类型: long
public static final String RESNAME_TIME = "APP_TIME"; public static final String RESNAME_TIME = "APP_TIME";
//本地进程的根目录, 类型String //当前进程的根目录, 类型String
public static final String RESNAME_HOME = "APP_HOME"; public static final String RESNAME_HOME = "APP_HOME";
//本地节点的名称, 类型String //当前进程节点的名称, 类型String
public static final String RESNAME_NODE = "APP_NODE"; public static final String RESNAME_NODE = "APP_NODE";
//本地节点的所属组, 类型String、Map<String, Set<String>>、Map<String, List<SimpleEntry<String, InetSocketAddress[]>>> //当前进程节点的所属组, 类型String、Map<String, Set<String>>、Map<String, List<SimpleEntry<String, InetSocketAddress[]>>>
public static final String RESNAME_GROUP = "APP_GROUP"; public static final String RESNAME_GROUP = "APP_GROUP";
//本地节点的所属组所有节点名, 类型Set<String> 、List<SimpleEntry<String, InetSocketAddress[]>>包含自身节点名 //当前进程节点的所属组所有节点名, 类型Set<String> 、List<SimpleEntry<String, InetSocketAddress[]>>包含自身节点名
public static final String RESNAME_INGROUP = "APP_INGROUP"; public static final String RESNAME_INGROUP = "APP_INGROUP";
//除本地节点的所属组外其他所有组的所有节点名, 类型Map<String, Set<String>>、Map<String, List<SimpleEntry<String, InetSocketAddress[]>>> //除当前进程节点的所属组外其他所有组的所有节点名, 类型Map<String, Set<String>>、Map<String, List<SimpleEntry<String, InetSocketAddress[]>>>
public static final String RESNAME_OUTGROUP = "APP_OUTGROUP"; public static final String RESNAME_OUTGROUP = "APP_OUTGROUP";
//本地节点的IP地址 类型InetAddress、String //当前进程节点的IP地址 类型InetAddress、String
public static final String RESNAME_ADDR = "APP_ADDR"; public static final String RESNAME_ADDR = "APP_ADDR";
//application.xml 文件中resources节点的内容 类型: AnyValue //application.xml 文件中resources节点的内容 类型: AnyValue

View File

@@ -5,6 +5,7 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.service.*;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -21,6 +22,8 @@ public abstract class WebSocket {
WebSocketGroup group; WebSocketGroup group;
WebSocketNodeService nodeService;
String sessionid; String sessionid;
Serializable groupid; Serializable groupid;
@@ -55,6 +58,33 @@ public abstract class WebSocket {
send(new WebSocketPacket(data, last)); send(new WebSocketPacket(data, last));
} }
//----------------------------------------------------------------
public final boolean sendMessage(Serializable groupid, String text) {
return sendMessage(groupid, text, true);
}
public final boolean sendMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, data, true);
}
public final boolean sendMessage(Serializable groupid, String text, boolean last) {
if (nodeService == null) return false;
if (groupid == this.groupid) {
return nodeService.onSend(this.engine.getEngineid(), groupid, text, last);
} else {
return nodeService.send(this.engine.getEngineid(), groupid, text, last);
}
}
public final boolean sendMessage(Serializable groupid, byte[] data, boolean last) {
if (nodeService == null) return false;
if (groupid == this.groupid) {
return nodeService.onSend(this.engine.getEngineid(), groupid, data, last);
} else {
return nodeService.send(this.engine.getEngineid(), groupid, data, last);
}
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final <T> T getAttribute(String name) { public final <T> T getAttribute(String name) {
return (T) attributes.get(name); return (T) attributes.get(name);

View File

@@ -15,13 +15,17 @@ import java.util.concurrent.*;
*/ */
public final class WebSocketEngine { public final class WebSocketEngine {
private final long engineid = Math.abs(System.nanoTime()); private String engineid;
private final Map<Serializable, WebSocketGroup> containers = new ConcurrentHashMap<>(); private final Map<Serializable, WebSocketGroup> containers = new ConcurrentHashMap<>();
WebSocketEngine() { WebSocketEngine() {
} }
void setEngineid(String engineid) {
this.engineid = engineid;
}
void add(WebSocket socket) { void add(WebSocket socket) {
WebSocketGroup group = containers.get(socket.groupid); WebSocketGroup group = containers.get(socket.groupid);
if (group == null) { if (group == null) {
@@ -45,7 +49,7 @@ public final class WebSocketEngine {
void close() { void close() {
} }
public long getEngineid() { public String getEngineid() {
return engineid; return engineid;
} }
} }

View File

@@ -8,6 +8,7 @@ package com.wentch.redkale.net.http;
import com.wentch.redkale.net.AsyncConnection; import com.wentch.redkale.net.AsyncConnection;
import com.wentch.redkale.net.Context; import com.wentch.redkale.net.Context;
import com.wentch.redkale.net.http.WebSocketPacket.PacketType; import com.wentch.redkale.net.http.WebSocketPacket.PacketType;
import com.wentch.redkale.service.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.security.SecureRandom; import java.security.SecureRandom;
@@ -29,6 +30,8 @@ public class WebSocketRunner implements Runnable {
protected final Context context; protected final Context context;
protected final WebSocketNodeService nodeService;
private ByteBuffer readBuffer; private ByteBuffer readBuffer;
private ByteBuffer writeBuffer; private ByteBuffer writeBuffer;
@@ -41,8 +44,9 @@ public class WebSocketRunner implements Runnable {
private final BlockingQueue<byte[]> queue = new ArrayBlockingQueue(1024); private final BlockingQueue<byte[]> queue = new ArrayBlockingQueue(1024);
public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel) { public WebSocketRunner(Context context, WebSocketNodeService nodeService, WebSocket webSocket, AsyncConnection channel) {
this.context = context; this.context = context;
this.nodeService = nodeService;
this.engine = webSocket.engine; this.engine = webSocket.engine;
this.webSocket = webSocket; this.webSocket = webSocket;
this.channel = channel; this.channel = channel;
@@ -57,6 +61,7 @@ public class WebSocketRunner implements Runnable {
public void run() { public void run() {
final boolean debug = this.coder.debugable; final boolean debug = this.coder.debugable;
try { try {
if (nodeService != null) nodeService.connectSelf(webSocket.groupid);
webSocket.onConnected(); webSocket.onConnected();
channel.setReadTimeoutSecond(300); //读取超时5分钟 channel.setReadTimeoutSecond(300); //读取超时5分钟
if (channel.isOpen()) { if (channel.isOpen()) {
@@ -176,6 +181,10 @@ public class WebSocketRunner implements Runnable {
readBuffer = null; readBuffer = null;
writeBuffer = null; writeBuffer = null;
engine.remove(webSocket); engine.remove(webSocket);
if (nodeService != null) {
WebSocketGroup group = webSocket.getWebSocketGroup();
if (group == null || group.isEmpty()) nodeService.disconnectSelf(webSocket.groupid);
}
webSocket.onClose(0, null); webSocket.onClose(0, null);
} }

View File

@@ -6,6 +6,7 @@
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.net.*; import com.wentch.redkale.net.*;
import com.wentch.redkale.service.*;
import com.wentch.redkale.util.*; import com.wentch.redkale.util.*;
import java.io.*; import java.io.*;
import java.nio.*; import java.nio.*;
@@ -13,6 +14,7 @@ import java.nio.channels.*;
import java.security.*; import java.security.*;
import java.util.*; import java.util.*;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.*;
/** /**
* *
@@ -32,18 +34,26 @@ public abstract class WebSocketServlet extends HttpServlet {
} }
} }
@Resource
protected WebSocketNodeService nodeService;
protected final WebSocketEngine engine = new WebSocketEngine(); protected final WebSocketEngine engine = new WebSocketEngine();
@Override
public void init(Context context, AnyValue conf) {
engine.setEngineid(context.getServerAddress().getPort() + "-" + Arrays.toString(this.getClass().getAnnotation(WebServlet.class).value()));
if (nodeService != null) {
nodeService.addWebSocketEngine(engine);
nodeService.initUserNodes();
}
}
@Override @Override
public void destroy(Context context, AnyValue conf) { public void destroy(Context context, AnyValue conf) {
super.destroy(context, conf); super.destroy(context, conf);
engine.close(); engine.close();
} }
protected long getEngineid() {
return engine.getEngineid();
}
@Override @Override
public final void execute(HttpRequest request, HttpResponse response) throws IOException { public final void execute(HttpRequest request, HttpResponse response) throws IOException {
final boolean debug = logger.isLoggable(Level.FINER); final boolean debug = logger.isLoggable(Level.FINER);
@@ -62,6 +72,7 @@ public abstract class WebSocketServlet extends HttpServlet {
} }
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
webSocket.engine = engine; webSocket.engine = engine;
webSocket.nodeService = nodeService;
String sessionid = webSocket.onOpen(request); String sessionid = webSocket.onOpen(request);
if (sessionid == null) { if (sessionid == null) {
if (debug) logger.finer("WebSocket connect abort, Not found sessionid. request=" + request); if (debug) logger.finer("WebSocket connect abort, Not found sessionid. request=" + request);
@@ -92,7 +103,7 @@ public abstract class WebSocketServlet extends HttpServlet {
} }
webSocket.groupid = groupid; webSocket.groupid = groupid;
engine.add(webSocket); engine.add(webSocket);
context.submit(new WebSocketRunner(context, webSocket, response.removeChannel())); context.submit(new WebSocketRunner(context, nodeService, webSocket, response.removeChannel()));
response.finish(true); response.finish(true);
} }

View File

@@ -34,11 +34,14 @@ public class WebSocketNodeService implements Service {
//用户分布在节点上的队列信息,只保存远程节点的用户分布信息 //用户分布在节点上的队列信息,只保存远程节点的用户分布信息
protected final ConcurrentHashMap<Serializable, Set<String>> usernodes = new ConcurrentHashMap(); protected final ConcurrentHashMap<Serializable, Set<String>> usernodes = new ConcurrentHashMap();
protected final ConcurrentHashMap<Long, WebSocketEngine> engines = new ConcurrentHashMap(); protected final ConcurrentHashMap<String, WebSocketEngine> engines = new ConcurrentHashMap();
@Override @Override
public void init(AnyValue conf) { public void init(AnyValue conf) {
if (fine) logger.fine(this.localNodeName + ", " + this + ", " + nodemaps); if (fine) logger.fine(this.localNodeName + ", " + this + ", " + nodemaps);
}
public void initUserNodes() {
if (this.nodemaps == null || this.nodemaps.isEmpty()) return; if (this.nodemaps == null || this.nodemaps.isEmpty()) return;
new Thread() { new Thread() {
{ {
@@ -132,42 +135,42 @@ public class WebSocketNodeService implements Service {
} }
@RemoteOn @RemoteOn
public boolean send(long engineid, Serializable groupid, String text) { public boolean send(String engineid, Serializable groupid, String text) {
return send(engineid, groupid, text, true); return send(engineid, groupid, text, true);
} }
public final boolean onSend(long engineid, Serializable groupid, String text) { public final boolean onSend(String engineid, Serializable groupid, String text) {
return onSend(engineid, groupid, text, true); return onSend(engineid, groupid, text, true);
} }
@RemoteOn @RemoteOn
public boolean send(long engineid, Serializable groupid, String text, boolean last) { public boolean send(String engineid, Serializable groupid, String text, boolean last) {
return send0(engineid, groupid, text, last); return send0(engineid, groupid, text, last);
} }
public final boolean onSend(long engineid, Serializable groupid, String text, boolean last) { public final boolean onSend(String engineid, Serializable groupid, String text, boolean last) {
return onSend0(engineid, groupid, text, last); return onSend0(engineid, groupid, text, last);
} }
@RemoteOn @RemoteOn
public boolean send(long engineid, Serializable groupid, byte[] data) { public boolean send(String engineid, Serializable groupid, byte[] data) {
return send(engineid, groupid, data, true); return send(engineid, groupid, data, true);
} }
public final boolean onSend(long engineid, Serializable groupid, byte[] data) { public final boolean onSend(String engineid, Serializable groupid, byte[] data) {
return onSend(engineid, groupid, data, true); return onSend(engineid, groupid, data, true);
} }
@RemoteOn @RemoteOn
public boolean send(long engineid, Serializable groupid, byte[] data, boolean last) { public boolean send(String engineid, Serializable groupid, byte[] data, boolean last) {
return send0(engineid, groupid, data, last); return send0(engineid, groupid, data, last);
} }
public final boolean onSend(long engineid, Serializable groupid, byte[] data, boolean last) { public final boolean onSend(String engineid, Serializable groupid, byte[] data, boolean last) {
return onSend0(engineid, groupid, data, last); return onSend0(engineid, groupid, data, last);
} }
private boolean send0(long engineid, Serializable groupid, Serializable text, boolean last) { private boolean send0(String engineid, Serializable groupid, Serializable text, boolean last) {
final Set<String> nodes = usernodes.get(groupid); final Set<String> nodes = usernodes.get(groupid);
if (nodes == null) return false; if (nodes == null) return false;
boolean rs = false; boolean rs = false;
@@ -195,7 +198,7 @@ public class WebSocketNodeService implements Service {
* @param text * @param text
* @return * @return
*/ */
private boolean onSend0(long engineid, Serializable groupid, Serializable text, boolean last) { private boolean onSend0(String engineid, Serializable groupid, Serializable text, boolean last) {
WebSocketEngine webSocketEngine = engines.get(engineid); WebSocketEngine webSocketEngine = engines.get(engineid);
if (webSocketEngine == null) return false; if (webSocketEngine == null) return false;
WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid); WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid);