This commit is contained in:
Redkale
2017-06-19 09:53:29 +08:00
parent 0ba2e25f2e
commit 2aee84d477
4 changed files with 12 additions and 53 deletions

View File

@@ -14,26 +14,18 @@ import java.util.concurrent.*;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.net.*;
import org.redkale.util.Comment; import org.redkale.util.Comment;
/** /**
* <blockquote><pre> * <blockquote><pre>
* 一个WebSocket连接对应一个WebSocket实体即一个WebSocket会绑定一个TCP连接。 * 一个WebSocket连接对应一个WebSocket实体即一个WebSocket会绑定一个TCP连接。
* WebSocket 有两种模式: * 协议上符合HTML5规范, 其流程顺序如下:
* 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下:
* 1.1 onOpen 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断登录态。 * 1.1 onOpen 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断登录态。
* 1.2 createUserid 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断用户权限是否符合。 * 1.2 createUserid 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断用户权限是否符合。
* 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。 * 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。
* 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。 * 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。
* 1.5 onClose WebSocket被关闭后回调此方法。 * 1.5 onClose WebSocket被关闭后回调此方法。
* 普通模式下 以上方法都应该被重载。 * 普通模式下 以上方法都应该被重载。
*
* 2) 原始二进制模式: 此模式有别于HTML5规范可以视为原始的TCP连接。通常用于音频视频通讯场景。其流程顺序如下:
* 2.1 onOpen 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断登录态。
* 2.2 createWebSocketid 若返回null视为WebSocket的连接不合法强制关闭WebSocket连接通常用于判断用户权限是否符合。
* 2.3 onRead WebSocket成功连接后回调此方法 由此方法处理原始的TCP连接 需要业务代码去控制WebSocket的关闭。
* 二进制模式下 以上方法都应该被重载。
* </pre></blockquote> * </pre></blockquote>
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
@@ -422,14 +414,6 @@ public abstract class WebSocket<G extends Serializable, T> {
*/ */
protected abstract CompletableFuture<G> createUserid(); protected abstract CompletableFuture<G> createUserid();
/**
* 标记为WebSocketBinary才需要重写此方法
*
* @param channel 请求连接
*/
public void onRead(AsyncConnection channel) {
}
/** /**
* WebSokcet连接成功后的回调方法 * WebSokcet连接成功后的回调方法
*/ */
@@ -461,6 +445,15 @@ public abstract class WebSocket<G extends Serializable, T> {
public void onMessage(T message, boolean last) { public void onMessage(T message, boolean last) {
} }
/**
* 接收到文本消息的回调方法
*
* @param text 消息
* @param last 是否最后一条
*/
public void onMessage(String text, boolean last) {
}
/** /**
* 接收到二进制消息的回调方法 * 接收到二进制消息的回调方法
* *

View File

@@ -1,24 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.http;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* 被标记为 &#64;WebSocketBinary 的WebSocketServlet 将使用原始的TCP传输, 通常用于类似音频/视频传输场景
*
* <p> 详情见: https://redkale.org
* @author zhangjx
*/
@Inherited
@Documented
@Target({TYPE})
@Retention(RUNTIME)
public @interface WebSocketBinary {
}

View File

@@ -44,19 +44,16 @@ class WebSocketRunner implements Runnable {
private final BlockingQueue<QueueEntry> queue = new ArrayBlockingQueue(1024); private final BlockingQueue<QueueEntry> queue = new ArrayBlockingQueue(1024);
private final boolean wsbinary;
private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用 private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用
protected long lastSendTime; protected long lastSendTime;
WebSocketRunner(Context context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer, AsyncConnection channel, final boolean wsbinary) { WebSocketRunner(Context context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer, AsyncConnection channel) {
this.context = context; this.context = context;
this.engine = webSocket._engine; this.engine = webSocket._engine;
this.webSocket = webSocket; this.webSocket = webSocket;
this.restMessageConsumer = messageConsumer; this.restMessageConsumer = messageConsumer;
this.channel = channel; this.channel = channel;
this.wsbinary = wsbinary;
this.readBuffer = context.pollBuffer(); this.readBuffer = context.pollBuffer();
} }
@@ -67,10 +64,6 @@ class WebSocketRunner implements Runnable {
webSocket.onConnected(); webSocket.onConnected();
channel.setReadTimeoutSecond(300); //读取超时5分钟 channel.setReadTimeoutSecond(300); //读取超时5分钟
if (channel.isOpen()) { if (channel.isOpen()) {
if (wsbinary) {
webSocket.onRead(channel);
return;
}
channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() { channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
private ByteBuffer recentExBuffer; private ByteBuffer recentExBuffer;

View File

@@ -53,9 +53,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
private final MessageDigest digest = getMessageDigest(); private final MessageDigest digest = getMessageDigest();
@Comment("是否用于二进制流传输")
protected final boolean wsbinary = getClass().getAnnotation(WebSocketBinary.class) != null;
private final BiConsumer<WebSocket, Object> restMessageConsumer = createRestOnMessageConsumer(); private final BiConsumer<WebSocket, Object> restMessageConsumer = createRestOnMessageConsumer();
protected Type messageTextType; //RestWebSocket时会被修改 protected Type messageTextType; //RestWebSocket时会被修改
@@ -183,7 +180,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
webSocket._userid = userid; webSocket._userid = userid;
WebSocketServlet.this.node.localEngine.add(webSocket); WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel(), wsbinary); WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner; webSocket._runner = runner;
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);