This commit is contained in:
地平线
2015-08-19 14:55:58 +08:00
parent 65e44fc2d9
commit cd185a3372
5 changed files with 85 additions and 47 deletions

View File

@@ -34,6 +34,26 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public abstract class WebSocket {
//消息不合法
public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2
//ws已经关闭
public static final int RETCODE_WSOCKET_CLOSED = 1 << 2; //4
//socket的buffer不合法
public static final int RETCODE_ILLEGALBUFFER = 1 << 3; //8
//ws发送消息异常
public static final int RETCODE_SENDEXCEPTION = 1 << 4; //16
public static final int RETCODE_ENGINE_NULL = 1 << 5; //32
public static final int RETCODE_NODESERVICE_NULL = 1 << 6; //64
public static final int RETCODE_GROUP_EMPTY = 1 << 7; //128
public static final int RETCODE_WSOFFLINE = 1 << 8; //256
WebSocketRunner runner;
WebSocketEngine engine;
@@ -56,9 +76,11 @@ public abstract class WebSocket {
* 发送消息体, 包含二进制/文本
* <p>
* @param packet
* @return
*/
public final void send(WebSocketPacket packet) {
if (this.runner != null) this.runner.sendMessage(packet);
public final int send(WebSocketPacket packet) {
if (this.runner != null) return this.runner.sendMessage(packet);
return RETCODE_WSOCKET_CLOSED;
}
/**
@@ -72,9 +94,10 @@ public abstract class WebSocket {
* 发送单一的文本消息
* <p>
* @param text 不可为空
* @return
*/
public final void send(String text) {
send(text, true);
public final int send(String text) {
return send(text, true);
}
/**
@@ -82,18 +105,20 @@ public abstract class WebSocket {
* <p>
* @param text 不可为空
* @param last 是否最后一条
* @return
*/
public final void send(String text, boolean last) {
send(new WebSocketPacket(text, last));
public final int send(String text, boolean last) {
return send(new WebSocketPacket(text, last));
}
/**
* 发送单一的二进制消息
* <p>
* @param data
* @return
*/
public final void send(byte[] data) {
send(data, true);
public final int send(byte[] data) {
return send(data, true);
}
/**
@@ -101,9 +126,10 @@ public abstract class WebSocket {
* <p>
* @param data 不可为空
* @param last 是否最后一条
* @return
*/
public final void send(byte[] data, boolean last) {
send(new WebSocketPacket(data, last));
public final int send(byte[] data, boolean last) {
return send(new WebSocketPacket(data, last));
}
/**
@@ -111,9 +137,10 @@ public abstract class WebSocket {
* <p>
* @param message 不可为空, 只能是String或者byte[]
* @param last 是否最后一条
* @return
*/
public final void send(Serializable message, boolean last) {
send(new WebSocketPacket(message, last));
public final int send(Serializable message, boolean last) {
return send(new WebSocketPacket(message, last));
}
//----------------------------------------------------------------
@@ -210,12 +237,12 @@ public abstract class WebSocket {
}
private int sendMessage(Serializable groupid, boolean recent, String text, boolean last) {
if (node == null) return WebSocketNode.RETCODE_NODESERVICE_NULL;
if (node == null) return RETCODE_NODESERVICE_NULL;
return node.sendMessage(groupid, recent, text, last);
}
private int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) {
if (node == null) return WebSocketNode.RETCODE_NODESERVICE_NULL;
if (node == null) return RETCODE_NODESERVICE_NULL;
return node.sendMessage(groupid, recent, data, last);
}

View File

@@ -76,24 +76,29 @@ public final class WebSocketGroup {
attributes.put(name, value);
}
public final void send(boolean recent, Serializable message, boolean last) {
public final int send(boolean recent, Serializable message, boolean last) {
if (recent) {
recentWebSocket.send(message, last);
return recentWebSocket.send(message, last);
} else {
list.forEach(x -> x.send(message, last));
return sendEach(message, last);
}
}
public final void sendEach(Serializable message, boolean last) {
list.forEach(x -> x.send(message, last));
public final int sendEach(Serializable message, boolean last) {
int rs = 0;
for (WebSocket s : list) {
rs |= s.send(message, last);
}
return rs;
}
public final void sendRecent(Serializable message, boolean last) {
recentWebSocket.send(message, last);
public final int sendRecent(Serializable message, boolean last) {
return recentWebSocket.send(message, last);
}
@Override
public String toString() {
return "{groupid: " + groupid + ", list.size: " + (list == null ? -1 : list.size()) + "}";
}
}

View File

@@ -5,6 +5,7 @@
*/
package com.wentch.redkale.net.http;
import static com.wentch.redkale.net.http.WebSocket.*;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.util.*;
import java.io.*;
@@ -20,14 +21,6 @@ import javax.annotation.*;
*/
public abstract class WebSocketNode {
public static final int RETCODE_ENGINE_NULL = 5001;
public static final int RETCODE_NODESERVICE_NULL = 5002;
public static final int RETCODE_GROUP_EMPTY = 5005;
public static final int RETCODE_WSOFFLINE = 5011;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final boolean finest = logger.isLoggable(Level.FINEST);
@@ -39,7 +32,7 @@ public abstract class WebSocketNode {
protected WebSocketNode remoteNode;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合
protected final ConcurrentHashMap<Serializable, Set<InetSocketAddress>> dataNodes = new ConcurrentHashMap();
protected final ConcurrentHashMap<Serializable, LinkedHashSet<InetSocketAddress>> dataNodes = new ConcurrentHashMap();
//存放所有用户分布在节点上的队列信息,Set<String> 为 engineid 的集合
protected final ConcurrentHashMap<Serializable, Set<String>> localNodes = new ConcurrentHashMap();
@@ -56,7 +49,7 @@ public abstract class WebSocketNode {
@Override
public void run() {
try {
Map<Serializable, Set<InetSocketAddress>> map = remoteNode.getDataNodes();
Map<Serializable, LinkedHashSet<InetSocketAddress>> map = remoteNode.getDataNodes();
if (map != null) dataNodes.putAll(map);
} catch (Exception e) {
logger.log(Level.INFO, WebSocketNode.class.getSimpleName() + "(" + localSncpAddress + ") not load data nodes ", e);
@@ -75,7 +68,7 @@ public abstract class WebSocketNode {
});
}
public Map<Serializable, Set<InetSocketAddress>> getDataNodes() {
public Map<Serializable, LinkedHashSet<InetSocketAddress>> getDataNodes() {
return dataNodes;
}
@@ -125,16 +118,24 @@ public abstract class WebSocketNode {
rscode = RETCODE_GROUP_EMPTY;
break;
}
group.send(recent, message, last);
rscode = group.send(recent, message, last);
}
}
}
if ((recent && rscode == 0) || remoteNode == null) return rscode;
Set<InetSocketAddress> addrs = dataNodes.get(groupid);
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点
for (InetSocketAddress addr : addrs) {
if (!addr.equals(localSncpAddress)) {
remoteNode.sendMessage(addr, groupid, recent, message, last);
LinkedHashSet<InetSocketAddress> addrs = dataNodes.get(groupid);
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点
if (recent) {
InetSocketAddress one = null;
for (InetSocketAddress addr : addrs) {
one = addr;
}
rscode = remoteNode.sendMessage(one, groupid, recent, message, last);
} else {
for (InetSocketAddress addr : addrs) {
if (!addr.equals(localSncpAddress)) {
rscode |= remoteNode.sendMessage(addr, groupid, recent, message, last);
}
}
}
} else {

View File

@@ -7,6 +7,7 @@ package com.wentch.redkale.net.http;
import com.wentch.redkale.net.AsyncConnection;
import com.wentch.redkale.net.Context;
import static com.wentch.redkale.net.http.WebSocket.*;
import com.wentch.redkale.net.http.WebSocketPacket.PacketType;
import java.nio.ByteBuffer;
import java.nio.channels.*;
@@ -147,18 +148,18 @@ public class WebSocketRunner implements Runnable {
}
}
public void sendMessage(WebSocketPacket packet) {
if (packet == null || closed) return;
public int sendMessage(WebSocketPacket packet) {
if (packet == null) return RETCODE_SEND_ILLPACKET;
if (closed) return RETCODE_WSOCKET_CLOSED;
final boolean debug = this.coder.debugable;
//System.out.println("推送消息");
final byte[] bytes = coder.encode(packet);
if (debug) context.getLogger().log(Level.FINEST, "send web socket message's length = " + bytes.length);
if (writing.getAndSet(true)) {
queue.add(bytes);
return;
return 0;
}
if (writeBuffer == null) return;
if (writeBuffer == null) return RETCODE_ILLEGALBUFFER;
ByteBuffer sendBuffer = null;
if (bytes.length <= writeBuffer.capacity()) {
writeBuffer.clear();
@@ -211,9 +212,11 @@ public class WebSocketRunner implements Runnable {
}
}
});
return 0;
} catch (Exception t) {
writing.set(false);
context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel", t);
return RETCODE_SENDEXCEPTION;
}
}

View File

@@ -6,12 +6,12 @@
package com.wentch.redkale.service;
import com.wentch.redkale.net.http.*;
import static com.wentch.redkale.net.http.WebSocket.*;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
/**
*
@@ -53,9 +53,9 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
@MultiRun
public void connect(Serializable groupid, InetSocketAddress addr) {
Set<InetSocketAddress> addrs = dataNodes.get(groupid);
LinkedHashSet<InetSocketAddress> addrs = dataNodes.get(groupid);
if (addrs == null) {
addrs = new CopyOnWriteArraySet<>();
addrs = new LinkedHashSet<>();
dataNodes.put(groupid, addrs);
}
addrs.add(addr);
@@ -67,7 +67,9 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
public void disconnect(Serializable groupid, InetSocketAddress addr) {
Set<InetSocketAddress> addrs = dataNodes.get(groupid);
if (addrs == null) return;
addrs.remove(addr);
synchronized (addrs) {
addrs.remove(addr);
}
if (addrs.isEmpty()) dataNodes.remove(groupid);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr);
}