This commit is contained in:
Redkale
2017-06-18 22:31:13 +08:00
parent 62139efca9
commit 98e8a7eb05
5 changed files with 27 additions and 23 deletions

View File

@@ -30,13 +30,6 @@ public @interface RestOnMessage {
*/
String name();
/**
* 是否为二进制消息, 默认为文本消息
*
* @return boolean
*/
boolean isBinary() default false;
/**
* 备注描述
*

View File

@@ -37,6 +37,13 @@ public @interface RestWebSocket {
*/
String catalog() default "";
/**
* 是否为二进制消息, 默认为文本消息
*
* @return boolean
*/
boolean binary() default false;
/**
* 是否单用户单连接, 默认单用户单连接
*

View File

@@ -14,7 +14,6 @@ import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*;
import org.redkale.util.Comment;
@@ -81,7 +80,9 @@ public abstract class WebSocket<G extends Serializable, T> {
String _remoteAddr;//不可能为空
JsonConvert _jsonConvert; //不可能为空
Convert _textConvert; //不可能为空
Convert _binaryConvert; //可能为空
java.lang.reflect.Type _messageTextType; //不可能为空
@@ -137,7 +138,7 @@ public abstract class WebSocket<G extends Serializable, T> {
} else if (message instanceof WebSocketPacket) {
return sendPacket((WebSocketPacket) message);
} else {
return sendPacket(new WebSocketPacket(_jsonConvert, json, last));
return sendPacket(new WebSocketPacket(_textConvert, json, last));
}
});
}
@@ -146,7 +147,7 @@ public abstract class WebSocket<G extends Serializable, T> {
} else if (message instanceof WebSocketPacket) {
return sendPacket((WebSocketPacket) message);
} else {
return sendPacket(new WebSocketPacket(_jsonConvert, message, last));
return sendPacket(new WebSocketPacket(_textConvert, message, last));
}
}
@@ -173,9 +174,9 @@ public abstract class WebSocket<G extends Serializable, T> {
*/
public final CompletableFuture<Integer> send(Convert convert, Object message, boolean last) {
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(convert == null ? _jsonConvert : convert, json, last)));
return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(convert == null ? _textConvert : convert, json, last)));
}
return sendPacket(new WebSocketPacket(convert == null ? _jsonConvert : convert, message, last));
return sendPacket(new WebSocketPacket(convert == null ? _textConvert : convert, message, last));
}
/**
@@ -219,7 +220,7 @@ public abstract class WebSocket<G extends Serializable, T> {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(json, last, userids));
}
CompletableFuture<Integer> rs = _engine.node.sendMessage(message, last, userids);
if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket message(" + _jsonConvert.convertTo(message) + ")");
if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")");
return rs;
}
@@ -248,7 +249,7 @@ public abstract class WebSocket<G extends Serializable, T> {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(json, last));
}
CompletableFuture<Integer> rs = _engine.node.broadcastMessage(message, last);
if (_engine.finest) _engine.logger.finest("broadcast send websocket message(" + _jsonConvert.convertTo(message) + ")");
if (_engine.finest) _engine.logger.finest("broadcast send websocket message(" + message + ")");
return rs;
}

View File

@@ -16,7 +16,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.logging.*;
import org.redkale.convert.json.JsonConvert;
/**
* WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner
@@ -50,8 +49,6 @@ class WebSocketRunner implements Runnable {
protected long lastSendTime;
protected final JsonConvert textConvert;
WebSocketRunner(Context context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer, AsyncConnection channel, final boolean wsbinary) {
this.context = context;
this.engine = webSocket._engine;
@@ -60,7 +57,6 @@ class WebSocketRunner implements Runnable {
this.channel = channel;
this.wsbinary = wsbinary;
this.readBuffer = context.pollBuffer();
this.textConvert = context.getJsonConvert();
}
@Override
@@ -118,7 +114,7 @@ class WebSocketRunner implements Runnable {
}
if (packet.type == FrameType.TEXT) {
Object message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
Object message = webSocket._textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);

View File

@@ -15,6 +15,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.logging.*;
import javax.annotation.*;
import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -64,10 +65,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected int liveinterval = DEFAILT_LIVEINTERVAL;
@Resource
protected JsonConvert jsonConvert; //Rest.createRestWebSocketServlet 需要过滤掉已有的@Resource
private JsonConvert jsonConvert;
protected Convert textConvert;
protected Convert binaryConvert;
@Resource(name = "$")
protected WebSocketNode node; //Rest.createRestWebSocketServlet 需要过滤掉已有的@Resource
protected WebSocketNode node;
protected WebSocketServlet() {
Type msgtype = String.class;
@@ -96,6 +101,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
this.node = new WebSocketNodeService();
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
if (this.textConvert == null) this.textConvert = jsonConvert;
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, logger);
this.node.init(conf);
@@ -131,7 +137,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
final WebSocket webSocket = this.createWebSocket();
webSocket._engine = this.node.localEngine;
webSocket._messageTextType = this.messageTextType;
webSocket._jsonConvert = jsonConvert;
webSocket._textConvert = textConvert;
webSocket._binaryConvert = binaryConvert;
webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr();
initRestWebSocket(webSocket);