This commit is contained in:
地平线
2015-06-21 16:28:22 +08:00
parent a5ad1d5c3a
commit 69d9cb0edf
5 changed files with 239 additions and 14 deletions

View File

@@ -5,6 +5,7 @@
*/
package com.wentch.redkale.net.http;
import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -22,7 +23,7 @@ public abstract class WebSocket {
String sessionid;
long groupid;
Serializable groupid;
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
@@ -42,10 +43,18 @@ public abstract class WebSocket {
send(text, true);
}
private void send(String text, boolean last) {
public final void send(String text, boolean last) {
send(new WebSocketPacket(text, last));
}
public final void send(byte[] data) {
send(data, true);
}
public final void send(byte[] data, boolean last) {
send(new WebSocketPacket(data, last));
}
@SuppressWarnings("unchecked")
public final <T> T getAttribute(String name) {
return (T) attributes.get(name);
@@ -59,7 +68,7 @@ public abstract class WebSocket {
attributes.put(name, value);
}
public final long getGroupid() {
public final Serializable getGroupid() {
return groupid;
}
@@ -77,7 +86,6 @@ public abstract class WebSocket {
}
//-------------------------------------------------------------------
/**
* 返回sessionid, null表示连接不合法或异常
*
@@ -89,12 +97,12 @@ public abstract class WebSocket {
}
/**
* 返回GroupID 负数表示异常
* 返回GroupID null表示异常
*
* @return
*/
public long createGroupid() {
return 0;
public Serializable createGroupid() {
return null;
}
/**

View File

@@ -5,6 +5,7 @@
*/
package com.wentch.redkale.net.http;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -16,7 +17,7 @@ public final class WebSocketEngine {
private final long engineid = Math.abs(System.nanoTime());
private final Map<Long, WebSocketGroup> containers = new ConcurrentHashMap<>();
private final Map<Serializable, WebSocketGroup> containers = new ConcurrentHashMap<>();
WebSocketEngine() {
}
@@ -37,7 +38,7 @@ public final class WebSocketEngine {
if (group.isEmpty()) containers.remove(socket.groupid);
}
public WebSocketGroup getWebSocketGroup(long groupid) {
public WebSocketGroup getWebSocketGroup(Serializable groupid) {
return containers.get(groupid);
}

View File

@@ -5,6 +5,7 @@
*/
package com.wentch.redkale.net.http;
import java.io.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Stream;
@@ -15,17 +16,17 @@ import java.util.stream.Stream;
*/
public final class WebSocketGroup {
private final long groupid;
private final Serializable groupid;
private final List<WebSocket> list = new CopyOnWriteArrayList<>();
private final Map<String, Object> attributes = new HashMap<>();
WebSocketGroup(long groupid) {
WebSocketGroup(Serializable groupid) {
this.groupid = groupid;
}
public long getGroupid() {
public Serializable getGroupid() {
return groupid;
}

View File

@@ -40,6 +40,10 @@ public abstract class WebSocketServlet extends HttpServlet {
engine.close();
}
protected long getEngineid() {
return engine.getEngineid();
}
@Override
public final void execute(HttpRequest request, HttpResponse response) throws IOException {
final boolean debug = logger.isLoggable(Level.FINER);
@@ -80,8 +84,8 @@ public abstract class WebSocketServlet extends HttpServlet {
@Override
public void completed(Integer result, Void attachment) {
HttpContext context = response.getContext();
long groupid = webSocket.createGroupid();
if (groupid < 0) {
Serializable groupid = webSocket.createGroupid();
if (groupid == null) {
if (debug) logger.finer("WebSocket connect abort, Create groupid abort");
response.finish(true);
return;

View File

@@ -0,0 +1,211 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.service;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import javax.annotation.*;
/**
*
* @author zhangjx
*/
public class WebSocketNodeService implements Service {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final boolean fine = logger.isLoggable(Level.FINE);
protected final boolean finest = logger.isLoggable(Level.FINEST);
@Resource(name = "APP_NODE")
protected String localNodeName = "";
@Resource
protected HashMap<String, WebSocketNodeService> nodemaps;
//用户分布在节点上的队列信息,只保存远程节点的用户分布信息
protected final ConcurrentHashMap<Serializable, Set<String>> usernodes = new ConcurrentHashMap();
protected final ConcurrentHashMap<Long, WebSocketEngine> engines = new ConcurrentHashMap();
@Override
public void init(AnyValue conf) {
if (fine) logger.fine(this.localNodeName + ", " + this + ", " + nodemaps);
if (this.nodemaps == null || this.nodemaps.isEmpty()) return;
new Thread() {
{
setDaemon(true);
}
@Override
public void run() {
usernodes.putAll(queryNodes());
}
}.start();
}
public final void addWebSocketEngine(WebSocketEngine engine) {
engines.put(engine.getEngineid(), engine);
}
@RemoteOn
public Map<Serializable, Set<String>> queryNodes() {
Map<Serializable, Set<String>> rs = new HashMap<>();
this.nodemaps.forEach((x, y) -> {
if (!rs.isEmpty()) return;
try {
rs.putAll(y.queryNodes());
} catch (Exception e) {
logger.log(Level.WARNING, this.getClass().getSimpleName() + " query error (" + x + ")", e);
}
});
return rs;
}
public final Map<Serializable, Set<String>> onQueryNodes() {
Map<Serializable, Set<String>> rs = new HashMap<>();
rs.putAll(this.usernodes);
return rs;
}
public void connectSelf(Serializable userid) {
connect(this.localNodeName, userid);
}
public void disconnectSelf(Serializable userid) {
if (fine) logger.fine("LocalNode " + localNodeName + " disconnect " + userid);
disconnect(this.localNodeName, userid);
}
@RemoteOn
public void connect(String nodeid, Serializable userid) {
onConnect(nodeid, userid);
if (this.nodemaps == null) return;
this.nodemaps.forEach((x, y) -> {
try {
if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to connect (" + nodeid + "," + userid + ")");
y.connect(nodeid, userid);
} catch (Exception e) {
logger.log(Level.WARNING, this.getClass().getSimpleName() + " connect error (" + x + ", [" + nodeid + "," + userid + "])", e);
}
});
}
public final void onConnect(String nodeid, Serializable userid) {
if (fine) logger.fine("LocalNode " + localNodeName + " receive onConnect (" + nodeid + "," + userid + ")");
Set<String> userNodelist = usernodes.get(userid);
if (userNodelist == null) {
userNodelist = new CopyOnWriteArraySet<>();
usernodes.put(userid, userNodelist);
}
userNodelist.add(nodeid);
}
@RemoteOn
public void disconnect(String nodeid, Serializable userid) {
onDisconnect(nodeid, userid);
if (this.nodemaps == null) return;
this.nodemaps.forEach((x, y) -> {
try {
if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to disconnect (" + nodeid + "," + userid + ")");
y.disconnect(nodeid, userid);
} catch (Exception e) {
logger.log(Level.WARNING, this.getClass().getSimpleName() + " disconnect error (" + x + ", [" + nodeid + "," + userid + "])", e);
}
});
}
public final void onDisconnect(String nodeid, Serializable userid) {
if (fine) logger.fine("LocalNode " + localNodeName + " receive onDisconnect (" + nodeid + "," + userid + ")");
Set<String> userNodelist = usernodes.get(userid);
if (userNodelist == null) return;
userNodelist.remove(nodeid);
if (userNodelist.isEmpty()) usernodes.remove(userid);
}
@RemoteOn
public boolean send(long engineid, Serializable groupid, String text) {
return send(engineid, groupid, text, true);
}
public final boolean onSend(long engineid, Serializable groupid, String text) {
return onSend(engineid, groupid, text, true);
}
@RemoteOn
public boolean send(long engineid, Serializable groupid, String text, boolean last) {
return send0(engineid, groupid, text, last);
}
public final boolean onSend(long engineid, Serializable groupid, String text, boolean last) {
return onSend0(engineid, groupid, text, last);
}
@RemoteOn
public boolean send(long engineid, Serializable groupid, byte[] data) {
return send(engineid, groupid, data, true);
}
public final boolean onSend(long engineid, Serializable groupid, byte[] data) {
return onSend(engineid, groupid, data, true);
}
@RemoteOn
public boolean send(long engineid, Serializable groupid, byte[] data, boolean last) {
return send0(engineid, groupid, data, last);
}
public final boolean onSend(long engineid, Serializable groupid, byte[] data, boolean last) {
return onSend0(engineid, groupid, data, last);
}
private boolean send0(long engineid, Serializable groupid, Serializable text, boolean last) {
final Set<String> nodes = usernodes.get(groupid);
if (nodes == null) return false;
boolean rs = false;
if (nodes.contains(this.localNodeName)) rs |= onSend0(engineid, groupid, text, last);
if (nodemaps == null) return rs;
this.nodemaps.forEach((x, y) -> {
if (nodes.contains(x)) {
try {
y.send0(engineid, groupid, text, last);
if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to send message (" + engineid + "," + groupid + "," + text + ")");
} catch (Exception e) {
onDisconnect(x, groupid);
logger.log(Level.WARNING, this.getClass().getSimpleName() + " send message error (" + x + ", [" + engineid + "," + groupid + "," + text + "])", e);
}
}
});
return true;
}
/**
* 消息接受者存在WebSocket并发送成功返回true 否则返回false
*
* @param engineid
* @param groupid 接收方
* @param text
* @return
*/
private boolean onSend0(long engineid, Serializable groupid, Serializable text, boolean last) {
WebSocketEngine webSocketEngine = engines.get(engineid);
if (webSocketEngine == null) return false;
WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) return false;
if (text != null && text.getClass() == byte[].class) {
group.getWebSockets().forEach(x -> x.send((byte[]) text, last));
} else {
group.getWebSockets().forEach(x -> x.send(text.toString(), last));
}
return true;
}
}