This commit is contained in:
地平线
2015-06-23 15:36:37 +08:00
parent 37de3bd694
commit 0ff6d7759b
2 changed files with 52 additions and 41 deletions

View File

@@ -59,16 +59,16 @@ public abstract class WebSocket {
}
//----------------------------------------------------------------
public final boolean sendMessage(Serializable groupid, String text) {
public final int sendMessage(Serializable groupid, String text) {
return sendMessage(groupid, text, true);
}
public final boolean sendMessage(Serializable groupid, byte[] data) {
public final int sendMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, data, true);
}
public final boolean sendMessage(Serializable groupid, String text, boolean last) {
if (nodeService == null) return false;
public final int sendMessage(Serializable groupid, String text, boolean last) {
if (nodeService == null) return WebSocketNodeService.RETCODE_NODESERVICE_NULL;
if (groupid == this.groupid) {
return nodeService.onSend(this.engine.getEngineid(), groupid, text, last);
} else {
@@ -76,8 +76,8 @@ public abstract class WebSocket {
}
}
public final boolean sendMessage(Serializable groupid, byte[] data, boolean last) {
if (nodeService == null) return false;
public final int sendMessage(Serializable groupid, byte[] data, boolean last) {
if (nodeService == null) return WebSocketNodeService.RETCODE_NODESERVICE_NULL;
if (groupid == this.groupid) {
return nodeService.onSend(this.engine.getEngineid(), groupid, data, last);
} else {

View File

@@ -6,7 +6,6 @@
package com.wentch.redkale.service;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -19,9 +18,15 @@ import javax.annotation.*;
*/
public class WebSocketNodeService implements Service {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
public static final int RETCODE_ENGINE_NULL = 5001;
protected final boolean fine = logger.isLoggable(Level.FINE);
public static final int RETCODE_NODESERVICE_NULL = 5002;
public static final int RETCODE_GROUP_EMPTY = 5005;
public static final int RETCODE_WSOFFLINE = 5011;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final boolean finest = logger.isLoggable(Level.FINEST);
@@ -36,11 +41,6 @@ public class WebSocketNodeService implements Service {
protected final ConcurrentHashMap<String, WebSocketEngine> engines = new ConcurrentHashMap();
@Override
public void init(AnyValue conf) {
if (fine) logger.fine(this.localNodeName + ", " + this + ", " + nodemaps);
}
public void initUserNodes() {
if (this.nodemaps == null || this.nodemaps.isEmpty()) return;
new Thread() {
@@ -84,7 +84,6 @@ public class WebSocketNodeService implements Service {
}
public void disconnectSelf(Serializable userid) {
if (fine) logger.fine("LocalNode " + localNodeName + " disconnect " + userid);
disconnect(this.localNodeName, userid);
}
@@ -94,16 +93,16 @@ public class WebSocketNodeService implements Service {
if (this.nodemaps == null) return;
this.nodemaps.forEach((x, y) -> {
try {
if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to connect (" + nodeid + "," + userid + ")");
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket connect event (" + userid + " on " + nodeid + ")");
y.connect(nodeid, userid);
} catch (Exception e) {
logger.log(Level.WARNING, this.getClass().getSimpleName() + " connect error (" + x + ", [" + nodeid + "," + userid + "])", e);
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket connect event (" + userid + " on " + nodeid + ")", e);
}
});
}
public final void onConnect(String nodeid, Serializable userid) {
if (fine) logger.fine("LocalNode " + localNodeName + " receive onConnect (" + nodeid + "," + userid + ")");
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket connect event (" + userid + " on " + nodeid + ").");
Set<String> userNodelist = usernodes.get(userid);
if (userNodelist == null) {
userNodelist = new CopyOnWriteArraySet<>();
@@ -118,16 +117,16 @@ public class WebSocketNodeService implements Service {
if (this.nodemaps == null) return;
this.nodemaps.forEach((x, y) -> {
try {
if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to disconnect (" + nodeid + "," + userid + ")");
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket disconnect event (" + userid + " on " + nodeid + ")");
y.disconnect(nodeid, userid);
} catch (Exception e) {
logger.log(Level.WARNING, this.getClass().getSimpleName() + " disconnect error (" + x + ", [" + nodeid + "," + userid + "])", e);
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket disconnect event (" + userid + " on " + nodeid + ")", e);
}
});
}
public final void onDisconnect(String nodeid, Serializable userid) {
if (fine) logger.fine("LocalNode " + localNodeName + " receive onDisconnect (" + nodeid + "," + userid + ")");
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket disconnect event (" + userid + " on " + nodeid + ").");
Set<String> userNodelist = usernodes.get(userid);
if (userNodelist == null) return;
userNodelist.remove(nodeid);
@@ -135,59 +134,64 @@ public class WebSocketNodeService implements Service {
}
@RemoteOn
public boolean send(String engineid, Serializable groupid, String text) {
public int send(String engineid, Serializable groupid, String text) {
return send(engineid, groupid, text, true);
}
public final boolean onSend(String engineid, Serializable groupid, String text) {
public final int onSend(String engineid, Serializable groupid, String text) {
return onSend(engineid, groupid, text, true);
}
@RemoteOn
public boolean send(String engineid, Serializable groupid, String text, boolean last) {
public int send(String engineid, Serializable groupid, String text, boolean last) {
return send0(engineid, groupid, text, last);
}
public final boolean onSend(String engineid, Serializable groupid, String text, boolean last) {
public final int onSend(String engineid, Serializable groupid, String text, boolean last) {
return onSend0(engineid, groupid, text, last);
}
@RemoteOn
public boolean send(String engineid, Serializable groupid, byte[] data) {
public int send(String engineid, Serializable groupid, byte[] data) {
return send(engineid, groupid, data, true);
}
public final boolean onSend(String engineid, Serializable groupid, byte[] data) {
public final int onSend(String engineid, Serializable groupid, byte[] data) {
return onSend(engineid, groupid, data, true);
}
@RemoteOn
public boolean send(String engineid, Serializable groupid, byte[] data, boolean last) {
public int send(String engineid, Serializable groupid, byte[] data, boolean last) {
return send0(engineid, groupid, data, last);
}
public final boolean onSend(String engineid, Serializable groupid, byte[] data, boolean last) {
public final int onSend(String engineid, Serializable groupid, byte[] data, boolean last) {
return onSend0(engineid, groupid, data, last);
}
private boolean send0(String engineid, Serializable groupid, Serializable text, boolean last) {
private int send0(String engineid, Serializable groupid, Serializable text, boolean last) {
final Set<String> nodes = usernodes.get(groupid);
if (nodes == null) return false;
boolean rs = false;
if (nodes.contains(this.localNodeName)) rs |= onSend0(engineid, groupid, text, last);
if (nodes == null) return RETCODE_WSOFFLINE; //未登录
int rs = 0;
if (nodes.contains(this.localNodeName)) rs = onSend0(engineid, groupid, text, last);
if (nodemaps == null) return rs;
this.nodemaps.forEach((x, y) -> {
if (nodes.contains(x)) {
int irs = -1;
try {
y.send0(engineid, groupid, text, last);
if (fine) logger.fine("LocalNode " + localNodeName + " send RemoteNode " + x + " to send message (" + engineid + "," + groupid + "," + text + ")");
if (text != null && text.getClass() == byte[].class) {
irs = y.send(engineid, groupid, (byte[]) text, last);
} else {
irs = y.send(engineid, groupid, (String) text, last);
}
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} finish and result is " + irs);
} catch (Exception e) {
onDisconnect(x, groupid);
logger.log(Level.WARNING, this.getClass().getSimpleName() + " send message error (" + x + ", [" + engineid + "," + groupid + "," + text + "])", e);
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} failed and result is " + irs, e);
}
}
});
return true;
return rs;
}
/**
@@ -198,17 +202,24 @@ public class WebSocketNodeService implements Service {
* @param text
* @return
*/
private boolean onSend0(String engineid, Serializable groupid, Serializable text, boolean last) {
private int onSend0(String engineid, Serializable groupid, Serializable text, boolean last) {
WebSocketEngine webSocketEngine = engines.get(engineid);
if (webSocketEngine == null) return false;
if (webSocketEngine == null) {
if (finest) logger.finest("Node(" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} but result is " + RETCODE_ENGINE_NULL);
return RETCODE_ENGINE_NULL;
}
WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) return false;
if (group == null || group.isEmpty()) {
if (finest) logger.finest("Node(" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} but result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY;
}
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'}.");
if (text != null && text.getClass() == byte[].class) {
group.getWebSockets().forEach(x -> x.send((byte[]) text, last));
} else {
group.getWebSockets().forEach(x -> x.send(text.toString(), last));
}
return true;
return 0;
}
}