This commit is contained in:
地平线
2015-07-02 09:10:49 +08:00
parent 71654ad287
commit a001c4895a
2 changed files with 171 additions and 14 deletions

View File

@@ -12,6 +12,24 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
* 一个WebSocket连接对应一个WebSocket实体即一个WebSocket会绑定一个TCP连接。
* WebSocket 有两种模式:
* 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下:
* 1.1 onOpen 如果方法返回null则视为WebSocket的连接不合法框架会强制关闭WebSocket连接通常用于判断登录态。
* 1.2 createGroupid 如果方法返回null则视为WebSocket的连接不合法框架会强制关闭WebSocket连接通常用于判断用户权限是否符合。
* 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。
* 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。
* 1.5 onClose WebSocket被关闭后回调此方法。
*
* 此模式下 以上方法都应该被重载。
*
* 2) 原始二进制模式: 此模式有别于HTML5规范可以视为原始的TCP连接。通常用于音频视频通讯场景。期流程顺序如下:
* 2.1 onOpen 如果方法返回null则视为WebSocket的连接不合法框架会强制关闭WebSocket连接通常用于判断登录态。
* 2.2 createGroupid 如果方法返回null则视为WebSocket的连接不合法框架会强制关闭WebSocket连接通常用于判断用户权限是否符合。
* 2.3 onRead WebSocket成功连接后回调此方法 由此方法处理原始的TCP连接 同时业务代码去控制WebSocket的关闭。
*
* 此模式下 以上方法都应该被重载。
* <p>
* *
* @author zhangjx * @author zhangjx
*/ */
@@ -25,7 +43,7 @@ public abstract class WebSocket {
WebSocketNodeService nodeService; WebSocketNodeService nodeService;
String sessionid; Serializable sessionid;
Serializable groupid; Serializable groupid;
@@ -35,59 +53,149 @@ public abstract class WebSocket {
} }
//---------------------------------------------------------------- //----------------------------------------------------------------
/**
* 发送消息体, 包含二进制/文本
* <p>
* @param packet
*/
public final void send(WebSocketPacket packet) { public final void send(WebSocketPacket packet) {
if (this.runner != null) this.runner.sendMessage(packet); if (this.runner != null) this.runner.sendMessage(packet);
} }
/**
* 显式地关闭WebSocket
*/
public final void close() { public final void close() {
if (this.runner != null) this.runner.closeRunner(); if (this.runner != null) this.runner.closeRunner();
} }
/**
* 发送单一的文本消息
* <p>
* @param text 不可为空
*/
public final void send(String text) { public final void send(String text) {
send(text, true); send(text, true);
} }
/**
* 发送文本消息
* <p>
* @param text 不可为空
* @param last 是否最后一条
*/
public final void send(String text, boolean last) { public final void send(String text, boolean last) {
send(new WebSocketPacket(text, last)); send(new WebSocketPacket(text, last));
} }
/**
* 发送单一的二进制消息
* <p>
* @param data
*/
public final void send(byte[] data) { public final void send(byte[] data) {
send(data, true); send(data, true);
} }
/**
* 发送二进制消息
* <p>
* @param data 不可为空
* @param last 是否最后一条
*/
public final void send(byte[] data, boolean last) { public final void send(byte[] data, boolean last) {
send(new WebSocketPacket(data, last)); send(new WebSocketPacket(data, last));
} }
//---------------------------------------------------------------- //----------------------------------------------------------------
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息
* <p>
* @param groupid
* @param text 不可为空
* @return 为0表示成功 其他值表示异常
*/
public final int sendMessage(Serializable groupid, String text) { public final int sendMessage(Serializable groupid, String text) {
return sendMessage(groupid, text, true); return sendMessage(groupid, text, true);
} }
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息
* <p>
* @param groupid
* @param data 不可为空
* @return 为0表示成功 其他值表示异常
*/
public final int sendMessage(Serializable groupid, byte[] data) { public final int sendMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, data, true); return sendMessage(groupid, data, true);
} }
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息
* <p>
* @param groupid
* @param text 不可为空
* @param last
* @return 为0表示成功 其他值表示异常
*/
public final int sendMessage(Serializable groupid, String text, boolean last) { public final int sendMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, false, text, last); return sendMessage(groupid, false, text, last);
} }
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息
* <p>
* @param groupid
* @param data 不可为空
* @param last 是否最后一条
* @return 为0表示成功 其他值表示异常
*/
public final int sendMessage(Serializable groupid, byte[] data, boolean last) { public final int sendMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, false, data, last); return sendMessage(groupid, false, data, last);
} }
/**
* 给指定groupid的WebSocketGroup下最近活跃的WebSocket节点发送文本消息
* <p>
* @param groupid
* @param text 不可为空
* @return 为0表示成功 其他值表示异常
*/
public final int sendRecentMessage(Serializable groupid, String text) { public final int sendRecentMessage(Serializable groupid, String text) {
return sendRecentMessage(groupid, text, true); return sendRecentMessage(groupid, text, true);
} }
/**
* 给指定groupid的WebSocketGroup下最近活跃的WebSocket节点发送二进制消息
* <p>
* @param groupid
* @param data 不可为空
* @return 为0表示成功 其他值表示异常
*/
public final int sendRecentMessage(Serializable groupid, byte[] data) { public final int sendRecentMessage(Serializable groupid, byte[] data) {
return sendRecentMessage(groupid, data, true); return sendRecentMessage(groupid, data, true);
} }
/**
* 给指定groupid的WebSocketGroup下最近活跃的WebSocket节点发送文本消息
* <p>
* @param groupid
* @param text 不可为空
* @param last 是否最后一条
* @return 为0表示成功 其他值表示异常
*/
public final int sendRecentMessage(Serializable groupid, String text, boolean last) { public final int sendRecentMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, true, text, last); return sendMessage(groupid, true, text, last);
} }
/**
* 给指定groupid的WebSocketGroup下最近活跃的WebSocket节点发送二进制消息
* <p>
* @param groupid
* @param data 不可为空
* @param last 是否最后一条
* @return 为0表示成功 其他值表示异常
*/
public final int sendRecentMessage(Serializable groupid, byte[] data, boolean last) { public final int sendRecentMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, true, data, last); return sendMessage(groupid, true, data, last);
} }
@@ -110,32 +218,73 @@ public abstract class WebSocket {
} }
} }
/**
* 获取当前WebSocket下的属性
* <p>
* @param <T>
* @param name
* @return
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final <T> T getAttribute(String name) { public final <T> T getAttribute(String name) {
return (T) attributes.get(name); return (T) attributes.get(name);
} }
public final void removeAttribute(String name) { /**
attributes.remove(name); * 移出当前WebSocket下的属性
* <p>
* @param <T>
* @param name
* @return
*/
public final <T> T removeAttribute(String name) {
return (T) attributes.remove(name);
} }
/**
* 给当前WebSocket下的增加属性
* <p>
* @param name
* @param value
*/
public final void setAttribute(String name, Object value) { public final void setAttribute(String name, Object value) {
attributes.put(name, value); attributes.put(name, value);
} }
/**
* 获取当前WebSocket所属的groupid
* <p>
* @return
*/
public final Serializable getGroupid() { public final Serializable getGroupid() {
return groupid; return groupid;
} }
public final String getSessionid() { /**
* 获取当前WebSocket的会话ID 不会为null
* <p>
* @return
*/
public final Serializable getSessionid() {
return sessionid; return sessionid;
} }
//------------------------------------------------------------------- //-------------------------------------------------------------------
/**
* 获取当前WebSocket所属的WebSocketGroup 不会为null
* <p>
* @return
*/
protected final WebSocketGroup getWebSocketGroup() { protected final WebSocketGroup getWebSocketGroup() {
return group; return group;
} }
/**
* 获取指定groupid的WebSocketGroup, 没有返回null
* <p>
* @param groupid
* @return
*/
protected final WebSocketGroup getWebSocketGroup(long groupid) { protected final WebSocketGroup getWebSocketGroup(long groupid) {
return engine.getWebSocketGroup(groupid); return engine.getWebSocketGroup(groupid);
} }
@@ -151,12 +300,12 @@ public abstract class WebSocket {
* @param request * @param request
* @return * @return
*/ */
public String onOpen(final HttpRequest request) { public Serializable onOpen(final HttpRequest request) {
return request.getSessionid(false); return request.getSessionid(false);
} }
/** /**
* 返回GroupID null表示异常 * 创建groupid null表示异常
* *
* @return * @return
*/ */
@@ -164,11 +313,6 @@ public abstract class WebSocket {
return null; return null;
} }
/**
* WebSocketBinary模式流程顺序: onOpen、createGroupid、onRead
* WebSocket流程顺序: onOpen、createGroupid、onConnected、onMessage/onFragment+、onClose
*/
/** /**
* *
* @param channel * @param channel

View File

@@ -17,6 +17,19 @@ import java.util.logging.*;
import javax.annotation.*; import javax.annotation.*;
/** /**
* 当WebSocketServlet接收一个TCP连接后进行协议判断如果成功就会创建一个WebSocket。
*
* WebSocketServlet
* |
* |
* WebSocketEngine
* / \
* / \
* / \
* WebSocketGroup1 WebSocketGroup2
* / \ / \
* / \ / \
* WebSocket1 WebSocket2 WebSocket3 WebSocket4
* *
* @author zhangjx * @author zhangjx
*/ */
@@ -75,7 +88,7 @@ public abstract class WebSocketServlet extends HttpServlet {
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
webSocket.engine = engine; webSocket.engine = engine;
webSocket.nodeService = nodeService; webSocket.nodeService = nodeService;
String sessionid = webSocket.onOpen(request); Serializable sessionid = webSocket.onOpen(request);
if (sessionid == null) { if (sessionid == null) {
if (debug) logger.finer("WebSocket connect abort, Not found sessionid. request=" + request); if (debug) logger.finer("WebSocket connect abort, Not found sessionid. request=" + request);
response.finish(true); response.finish(true);