This commit is contained in:
Redkale
2017-06-24 10:32:00 +08:00
parent f254b48693
commit c524ba1797
3 changed files with 29 additions and 12 deletions

View File

@@ -76,6 +76,8 @@ public abstract class WebSocket<G extends Serializable, T> {
Convert _binaryConvert; //可能为空 Convert _binaryConvert; //可能为空
Convert _sendConvert; //不可能为空
java.lang.reflect.Type _messageTextType; //不可能为空 java.lang.reflect.Type _messageTextType; //不可能为空
private long createtime = System.currentTimeMillis(); private long createtime = System.currentTimeMillis();
@@ -130,7 +132,7 @@ public abstract class WebSocket<G extends Serializable, T> {
} else if (message instanceof WebSocketPacket) { } else if (message instanceof WebSocketPacket) {
return sendPacket((WebSocketPacket) message); return sendPacket((WebSocketPacket) message);
} else { } else {
return sendPacket(new WebSocketPacket(_textConvert, json, last)); return sendPacket(new WebSocketPacket(getSendConvert(), json, last));
} }
}); });
} }
@@ -139,7 +141,7 @@ public abstract class WebSocket<G extends Serializable, T> {
} else if (message instanceof WebSocketPacket) { } else if (message instanceof WebSocketPacket) {
return sendPacket((WebSocketPacket) message); return sendPacket((WebSocketPacket) message);
} else { } else {
return sendPacket(new WebSocketPacket(_textConvert, message, last)); return sendPacket(new WebSocketPacket(getSendConvert(), message, last));
} }
} }
@@ -166,9 +168,9 @@ public abstract class WebSocket<G extends Serializable, T> {
*/ */
public final CompletableFuture<Integer> send(Convert convert, Object message, boolean last) { public final CompletableFuture<Integer> send(Convert convert, Object message, boolean last) {
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(convert == null ? _textConvert : convert, json, last))); return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(convert == null ? getSendConvert() : convert, json, last)));
} }
return sendPacket(new WebSocketPacket(convert == null ? _textConvert : convert, message, last)); return sendPacket(new WebSocketPacket(convert == null ? getSendConvert() : convert, message, last));
} }
/** /**
@@ -352,6 +354,10 @@ public abstract class WebSocket<G extends Serializable, T> {
return _binaryConvert; return _binaryConvert;
} }
protected Convert getSendConvert() {
return _sendConvert;
}
//------------------------------------------------------------------- //-------------------------------------------------------------------
/** /**
* 获取指定userid的WebSocket数组, 没有返回null <br> * 获取指定userid的WebSocket数组, 没有返回null <br>

View File

@@ -12,7 +12,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.*; import java.util.stream.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.Convert;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import org.redkale.util.*; import org.redkale.util.*;
@@ -40,8 +40,8 @@ public final class WebSocketEngine {
//HttpContext //HttpContext
protected final HttpContext context; protected final HttpContext context;
//JsonConvert //Convert
protected final JsonConvert convert; protected final Convert sendConvert;
protected final boolean single; //是否单用户单连接 protected final boolean single; //是否单用户单连接
@@ -62,11 +62,11 @@ public final class WebSocketEngine {
private int liveinterval; private int liveinterval;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Logger logger) { protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) {
this.engineid = engineid; this.engineid = engineid;
this.single = single; this.single = single;
this.context = context; this.context = context;
this.convert = context.getJsonConvert(); this.sendConvert = sendConvert;
this.node = node; this.node = node;
this.liveinterval = liveinterval; this.liveinterval = liveinterval;
this.logger = logger; this.logger = logger;
@@ -135,7 +135,7 @@ public final class WebSocketEngine {
if (more) { if (more) {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier())); packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
if (single) { if (single) {
@@ -176,7 +176,7 @@ public final class WebSocketEngine {
if (more) { if (more) {
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier())); packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
if (single) { if (single) {

View File

@@ -61,10 +61,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected int liveinterval = DEFAILT_LIVEINTERVAL; protected int liveinterval = DEFAILT_LIVEINTERVAL;
@Resource(name = "jsonconvert") @Resource(name = "jsonconvert")
protected Convert jsonConvert;
@Resource(name = "$_textconvert")
protected Convert textConvert; protected Convert textConvert;
@Resource(name = "$_binaryconvert")
protected Convert binaryConvert; protected Convert binaryConvert;
@Resource(name = "$_sendconvert")
protected Convert sendConvert;
@Resource(name = "$") @Resource(name = "$")
protected WebSocketNode node; protected WebSocketNode node;
@@ -89,6 +96,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override @Override
final void preInit(HttpContext context, AnyValue conf) { final void preInit(HttpContext context, AnyValue conf) {
if (this.textConvert == null) this.textConvert = jsonConvert;
if (this.binaryConvert == null) this.binaryConvert = jsonConvert;
if (this.sendConvert == null) this.sendConvert = jsonConvert;
InetSocketAddress addr = context.getServerAddress(); InetSocketAddress addr = context.getServerAddress();
if (this.node == null) this.node = createWebSocketNode(); if (this.node == null) this.node = createWebSocketNode();
if (this.node == null) { //没有部署SNCP即不是分布式 if (this.node == null) { //没有部署SNCP即不是分布式
@@ -96,7 +106,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
} }
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service //存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, logger); this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, this.sendConvert, logger);
this.node.init(conf); this.node.init(conf);
this.node.localEngine.init(conf); this.node.localEngine.init(conf);
} }
@@ -132,6 +142,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._messageTextType = this.messageTextType; webSocket._messageTextType = this.messageTextType;
webSocket._textConvert = textConvert; webSocket._textConvert = textConvert;
webSocket._binaryConvert = binaryConvert; webSocket._binaryConvert = binaryConvert;
webSocket._sendConvert = sendConvert;
webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr(); webSocket._remoteAddr = request.getRemoteAddr();
initRestWebSocket(webSocket); initRestWebSocket(webSocket);