This commit is contained in:
@@ -13,21 +13,21 @@ import org.redkale.net.*;
|
||||
|
||||
/**
|
||||
* 一个WebSocket连接对应一个WebSocket实体,即一个WebSocket会绑定一个TCP连接。
|
||||
* WebSocket 有两种模式:
|
||||
* 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下:
|
||||
* 1.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。
|
||||
* 1.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。
|
||||
* 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。
|
||||
* 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。
|
||||
* 1.5 onClose WebSocket被关闭后回调此方法。
|
||||
*
|
||||
* WebSocket 有两种模式:
|
||||
* 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下:
|
||||
* 1.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。
|
||||
* 1.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。
|
||||
* 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。
|
||||
* 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。
|
||||
* 1.5 onClose WebSocket被关闭后回调此方法。
|
||||
*
|
||||
* 此模式下 以上方法都应该被重载。
|
||||
*
|
||||
* 2) 原始二进制模式: 此模式有别于HTML5规范,可以视为原始的TCP连接。通常用于音频视频通讯场景。期流程顺序如下:
|
||||
* 2.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。
|
||||
* 2.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。
|
||||
* 2.3 onRead WebSocket成功连接后回调此方法, 由此方法处理原始的TCP连接, 同时业务代码去控制WebSocket的关闭。
|
||||
*
|
||||
*
|
||||
* 2) 原始二进制模式: 此模式有别于HTML5规范,可以视为原始的TCP连接。通常用于音频视频通讯场景。期流程顺序如下:
|
||||
* 2.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。
|
||||
* 2.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。
|
||||
* 2.3 onRead WebSocket成功连接后回调此方法, 由此方法处理原始的TCP连接, 同时业务代码去控制WebSocket的关闭。
|
||||
*
|
||||
* 此模式下 以上方法都应该被重载。
|
||||
* <p>
|
||||
*
|
||||
@@ -56,17 +56,15 @@ public abstract class WebSocket {
|
||||
|
||||
public static final int RETCODE_WSOFFLINE = 1 << 8; //256
|
||||
|
||||
WebSocketRunner runner;
|
||||
WebSocketRunner runner; //不可能为空
|
||||
|
||||
WebSocketEngine engine;
|
||||
WebSocketEngine _engine; //不可能为空
|
||||
|
||||
WebSocketGroup group;
|
||||
WebSocketGroup _group; //不可能为空
|
||||
|
||||
WebSocketNode node;
|
||||
Serializable _sessionid; //不可能为空
|
||||
|
||||
Serializable sessionid;
|
||||
|
||||
Serializable groupid;
|
||||
Serializable _groupid; //不可能为空
|
||||
|
||||
private final long createtime = System.currentTimeMillis();
|
||||
|
||||
@@ -80,7 +78,7 @@ public abstract class WebSocket {
|
||||
* 发送消息体, 包含二进制/文本
|
||||
* <p>
|
||||
* @param packet
|
||||
* @return
|
||||
* @return
|
||||
*/
|
||||
public final int send(WebSocketPacket packet) {
|
||||
if (this.runner != null) return this.runner.sendMessage(packet);
|
||||
@@ -98,7 +96,7 @@ public abstract class WebSocket {
|
||||
* 发送单一的文本消息
|
||||
* <p>
|
||||
* @param text 不可为空
|
||||
* @return
|
||||
* @return
|
||||
*/
|
||||
public final int send(String text) {
|
||||
return send(text, true);
|
||||
@@ -109,7 +107,7 @@ public abstract class WebSocket {
|
||||
* <p>
|
||||
* @param text 不可为空
|
||||
* @param last 是否最后一条
|
||||
* @return
|
||||
* @return
|
||||
*/
|
||||
public final int send(String text, boolean last) {
|
||||
return send(new WebSocketPacket(text, last));
|
||||
@@ -119,7 +117,7 @@ public abstract class WebSocket {
|
||||
* 发送单一的二进制消息
|
||||
* <p>
|
||||
* @param data
|
||||
* @return
|
||||
* @return
|
||||
*/
|
||||
public final int send(byte[] data) {
|
||||
return send(data, true);
|
||||
@@ -142,7 +140,7 @@ public abstract class WebSocket {
|
||||
* <p>
|
||||
* @param data 不可为空
|
||||
* @param last 是否最后一条
|
||||
* @return
|
||||
* @return
|
||||
*/
|
||||
public final int send(byte[] data, boolean last) {
|
||||
return send(new WebSocketPacket(data, last));
|
||||
@@ -152,8 +150,8 @@ public abstract class WebSocket {
|
||||
* 发送消息, 消息类型是String或byte[]
|
||||
* <p>
|
||||
* @param message 不可为空, 只能是String或者byte[]
|
||||
* @param last 是否最后一条
|
||||
* @return
|
||||
* @param last 是否最后一条
|
||||
* @return
|
||||
*/
|
||||
public final int send(Serializable message, boolean last) {
|
||||
return send(new WebSocketPacket(message, last));
|
||||
@@ -253,13 +251,13 @@ public abstract class WebSocket {
|
||||
}
|
||||
|
||||
private int sendMessage(Serializable groupid, boolean recent, String text, boolean last) {
|
||||
if (node == null) return RETCODE_NODESERVICE_NULL;
|
||||
return node.sendMessage(groupid, recent, text, last);
|
||||
if (_engine.node == null) return RETCODE_NODESERVICE_NULL;
|
||||
return _engine.node.sendMessage(groupid, recent, text, last);
|
||||
}
|
||||
|
||||
private int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) {
|
||||
if (node == null) return RETCODE_NODESERVICE_NULL;
|
||||
return node.sendMessage(groupid, recent, data, last);
|
||||
if (_engine.node == null) return RETCODE_NODESERVICE_NULL;
|
||||
return _engine.node.sendMessage(groupid, recent, data, last);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -301,7 +299,7 @@ public abstract class WebSocket {
|
||||
* @return
|
||||
*/
|
||||
public final Serializable getGroupid() {
|
||||
return groupid;
|
||||
return _groupid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -310,7 +308,7 @@ public abstract class WebSocket {
|
||||
* @return
|
||||
*/
|
||||
public final Serializable getSessionid() {
|
||||
return sessionid;
|
||||
return _sessionid;
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
@@ -320,7 +318,7 @@ public abstract class WebSocket {
|
||||
* @return
|
||||
*/
|
||||
protected final WebSocketGroup getWebSocketGroup() {
|
||||
return group;
|
||||
return _group;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -330,11 +328,11 @@ public abstract class WebSocket {
|
||||
* @return
|
||||
*/
|
||||
protected final WebSocketGroup getWebSocketGroup(Serializable groupid) {
|
||||
return engine.getWebSocketGroup(groupid);
|
||||
return _engine.getWebSocketGroup(groupid);
|
||||
}
|
||||
|
||||
protected final Collection<WebSocketGroup> getWebSocketGroups() {
|
||||
return engine.getWebSocketGroups();
|
||||
return _engine.getWebSocketGroups();
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
|
||||
@@ -27,6 +27,8 @@ public final class WebSocketEngine {
|
||||
|
||||
private final String engineid;
|
||||
|
||||
protected final WebSocketNode node;
|
||||
|
||||
private final Map<Serializable, WebSocketGroup> containers = new ConcurrentHashMap<>();
|
||||
|
||||
private ScheduledThreadPoolExecutor scheduler;
|
||||
@@ -35,8 +37,9 @@ public final class WebSocketEngine {
|
||||
|
||||
protected final boolean finest;
|
||||
|
||||
protected WebSocketEngine(String engineid, Logger logger) {
|
||||
protected WebSocketEngine(String engineid, WebSocketNode node, Logger logger) {
|
||||
this.engineid = engineid;
|
||||
this.node = node;
|
||||
this.logger = logger;
|
||||
this.index = sequence.getAndIncrement();
|
||||
this.finest = logger.isLoggable(Level.FINEST);
|
||||
@@ -60,19 +63,26 @@ public final class WebSocketEngine {
|
||||
}
|
||||
|
||||
void add(WebSocket socket) {
|
||||
WebSocketGroup group = containers.get(socket.groupid);
|
||||
WebSocketGroup group = containers.get(socket._groupid);
|
||||
if (group == null) {
|
||||
group = new WebSocketGroup(socket.groupid);
|
||||
containers.put(socket.groupid, group);
|
||||
group = new WebSocketGroup(socket._groupid);
|
||||
containers.putIfAbsent(socket._groupid, group);
|
||||
}
|
||||
group.add(socket);
|
||||
if (node != null) node.connect(socket._groupid, engineid);
|
||||
}
|
||||
|
||||
void remove(WebSocket socket) {
|
||||
WebSocketGroup group = containers.get(socket.groupid);
|
||||
if (group == null) return;
|
||||
final WebSocketGroup group = containers.get(socket._groupid);
|
||||
if (group == null) {
|
||||
if (node != null) node.disconnect(socket._groupid, engineid);
|
||||
return;
|
||||
}
|
||||
group.remove(socket);
|
||||
if (group.isEmpty()) containers.remove(socket.groupid);
|
||||
if (group.isEmpty()) {
|
||||
containers.remove(socket._groupid);
|
||||
if (node != null) node.disconnect(socket._groupid, engineid);
|
||||
}
|
||||
}
|
||||
|
||||
Collection<WebSocketGroup> getWebSocketGroups() {
|
||||
|
||||
@@ -42,7 +42,7 @@ public final class WebSocketGroup {
|
||||
}
|
||||
|
||||
void add(WebSocket socket) {
|
||||
socket.group = this;
|
||||
socket._group = this;
|
||||
this.recentWebSocket = socket;
|
||||
list.add(socket);
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ public class WebSocketRunner implements Runnable {
|
||||
|
||||
public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel, final boolean wsbinary) {
|
||||
this.context = context;
|
||||
this.engine = webSocket.engine;
|
||||
this.engine = webSocket._engine;
|
||||
this.webSocket = webSocket;
|
||||
this.channel = channel;
|
||||
this.wsbinary = wsbinary;
|
||||
@@ -63,7 +63,6 @@ public class WebSocketRunner implements Runnable {
|
||||
public void run() {
|
||||
final boolean debug = this.coder.debugable;
|
||||
try {
|
||||
if (webSocket.node != null) webSocket.node.connect(webSocket.groupid, webSocket.engine.getEngineid());
|
||||
webSocket.onConnected();
|
||||
channel.setReadTimeoutSecond(300); //读取超时5分钟
|
||||
if (channel.isOpen()) {
|
||||
@@ -119,7 +118,7 @@ public class WebSocketRunner implements Runnable {
|
||||
readBuffer.clear();
|
||||
channel.read(readBuffer, null, this);
|
||||
}
|
||||
webSocket.group.setRecentWebSocket(webSocket);
|
||||
webSocket._group.setRecentWebSocket(webSocket);
|
||||
try {
|
||||
if (packet.type == FrameType.TEXT) {
|
||||
webSocket.onMessage(packet.getPayload());
|
||||
@@ -244,10 +243,6 @@ public class WebSocketRunner implements Runnable {
|
||||
readBuffer = null;
|
||||
writeBuffer = null;
|
||||
engine.remove(webSocket);
|
||||
if (webSocket.node != null) {
|
||||
WebSocketGroup group = webSocket.getWebSocketGroup();
|
||||
if (group == null || group.isEmpty()) webSocket.node.disconnect(webSocket.groupid, webSocket.engine.getEngineid());
|
||||
}
|
||||
webSocket.onClose(0, null);
|
||||
}
|
||||
}
|
||||
@@ -370,27 +365,28 @@ public class WebSocketRunner implements Runnable {
|
||||
private Logger logger;
|
||||
|
||||
/**
|
||||
0 1 2 3
|
||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
+-+-+-+-+-------+-+-------------+-------------------------------+
|
||||
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|
||||
|I|S|S|S| (4) |A| (7) | (16/64) |
|
||||
|N|V|V|V| |S| | (if payload len==126/127) |
|
||||
| |1|2|3| |K| | |
|
||||
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|
||||
| Extended payload length continued, if payload len == 127 |
|
||||
+ - - - - - - - - - - - - - - - +-------------------------------+
|
||||
| |Masking-key, if MASK set to 1 |
|
||||
+-------------------------------+-------------------------------+
|
||||
| Masking-key (continued) | Payload Data |
|
||||
+-------------------------------- - - - - - - - - - - - - - - - +
|
||||
: Payload Data continued :
|
||||
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|
||||
| Payload Data continued |
|
||||
+-----------------------------------------------------------------------+
|
||||
@param buffer
|
||||
@param exbuffers
|
||||
@return
|
||||
* 0 1 2 3
|
||||
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
* +-+-+-+-+-------+-+-------------+-------------------------------+
|
||||
* |F|R|R|R| opcode|M| Payload len | Extended payload length |
|
||||
* |I|S|S|S| (4) |A| (7) | (16/64) |
|
||||
* |N|V|V|V| |S| | (if payload len==126/127) |
|
||||
* | |1|2|3| |K| | |
|
||||
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|
||||
* | Extended payload length continued, if payload len == 127 |
|
||||
* + - - - - - - - - - - - - - - - +-------------------------------+
|
||||
* | |Masking-key, if MASK set to 1 |
|
||||
* +-------------------------------+-------------------------------+
|
||||
* | Masking-key (continued) | Payload Data |
|
||||
* +-------------------------------- - - - - - - - - - - - - - - - +
|
||||
* : Payload Data continued :
|
||||
* + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|
||||
* | Payload Data continued |
|
||||
* +-----------------------------------------------------------------------+
|
||||
*
|
||||
* @param buffer
|
||||
* @param exbuffers
|
||||
* @return
|
||||
*/
|
||||
public WebSocketPacket decode(final ByteBuffer buffer, ByteBuffer... exbuffers) {
|
||||
final boolean debug = this.debugable;
|
||||
|
||||
@@ -63,7 +63,7 @@ public abstract class WebSocketServlet extends HttpServlet {
|
||||
@Override
|
||||
public void init(Context context, AnyValue conf) {
|
||||
InetSocketAddress addr = context.getServerAddress();
|
||||
this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-" + name(), logger);
|
||||
this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-" + name(), this.node, logger);
|
||||
this.node.putWebSocketEngine(engine);
|
||||
this.node.init(conf);
|
||||
this.engine.init(conf);
|
||||
@@ -97,15 +97,14 @@ public abstract class WebSocketServlet extends HttpServlet {
|
||||
return;
|
||||
}
|
||||
final WebSocket webSocket = this.createWebSocket();
|
||||
webSocket.engine = engine;
|
||||
webSocket.node = node;
|
||||
webSocket._engine = engine;
|
||||
Serializable sessionid = webSocket.onOpen(request);
|
||||
if (sessionid == null) {
|
||||
if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request);
|
||||
response.finish(true);
|
||||
return;
|
||||
}
|
||||
webSocket.sessionid = sessionid;
|
||||
webSocket._sessionid = sessionid;
|
||||
request.setKeepAlive(true);
|
||||
byte[] bytes = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes();
|
||||
synchronized (digest) {
|
||||
@@ -127,7 +126,7 @@ public abstract class WebSocketServlet extends HttpServlet {
|
||||
response.finish(true);
|
||||
return;
|
||||
}
|
||||
webSocket.groupid = groupid;
|
||||
webSocket._groupid = groupid;
|
||||
engine.add(webSocket);
|
||||
context.submit(new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary));
|
||||
response.finish(true);
|
||||
|
||||
Reference in New Issue
Block a user