From c524ba1797d9285b0b019cb35b951f276637daab Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 24 Jun 2017 10:32:00 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocket.java | 14 ++++++++++---- src/org/redkale/net/http/WebSocketEngine.java | 14 +++++++------- src/org/redkale/net/http/WebSocketServlet.java | 13 ++++++++++++- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index ee15995ed..185db4c6b 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -76,6 +76,8 @@ public abstract class WebSocket { Convert _binaryConvert; //可能为空 + Convert _sendConvert; //不可能为空 + java.lang.reflect.Type _messageTextType; //不可能为空 private long createtime = System.currentTimeMillis(); @@ -130,7 +132,7 @@ public abstract class WebSocket { } else if (message instanceof WebSocketPacket) { return sendPacket((WebSocketPacket) message); } else { - return sendPacket(new WebSocketPacket(_textConvert, json, last)); + return sendPacket(new WebSocketPacket(getSendConvert(), json, last)); } }); } @@ -139,7 +141,7 @@ public abstract class WebSocket { } else if (message instanceof WebSocketPacket) { return sendPacket((WebSocketPacket) message); } else { - return sendPacket(new WebSocketPacket(_textConvert, message, last)); + return sendPacket(new WebSocketPacket(getSendConvert(), message, last)); } } @@ -166,9 +168,9 @@ public abstract class WebSocket { */ public final CompletableFuture send(Convert convert, Object message, boolean last) { if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(convert == null ? _textConvert : convert, json, last))); + return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(convert == null ? getSendConvert() : convert, json, last))); } - return sendPacket(new WebSocketPacket(convert == null ? _textConvert : convert, message, last)); + return sendPacket(new WebSocketPacket(convert == null ? getSendConvert() : convert, message, last)); } /** @@ -352,6 +354,10 @@ public abstract class WebSocket { return _binaryConvert; } + protected Convert getSendConvert() { + return _sendConvert; + } + //------------------------------------------------------------------- /** * 获取指定userid的WebSocket数组, 没有返回null
diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index bda4a7c69..07e8787b3 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -12,7 +12,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.logging.*; import java.util.stream.*; -import org.redkale.convert.json.JsonConvert; +import org.redkale.convert.Convert; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import org.redkale.util.*; @@ -40,8 +40,8 @@ public final class WebSocketEngine { //HttpContext protected final HttpContext context; - //JsonConvert - protected final JsonConvert convert; + //Convert + protected final Convert sendConvert; protected final boolean single; //是否单用户单连接 @@ -62,11 +62,11 @@ public final class WebSocketEngine { private int liveinterval; - protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Logger logger) { + protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) { this.engineid = engineid; this.single = single; this.context = context; - this.convert = context.getJsonConvert(); + this.sendConvert = sendConvert; this.node = node; this.liveinterval = liveinterval; this.logger = logger; @@ -135,7 +135,7 @@ public final class WebSocketEngine { if (more) { final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) - ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); + ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last)); packet.setSendBuffers(packet.encode(context.getBufferSupplier())); CompletableFuture future = null; if (single) { @@ -176,7 +176,7 @@ public final class WebSocketEngine { if (more) { final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) - ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); + ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last)); packet.setSendBuffers(packet.encode(context.getBufferSupplier())); CompletableFuture future = null; if (single) { diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index c9628a44b..20cfeb767 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -61,10 +61,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl protected int liveinterval = DEFAILT_LIVEINTERVAL; @Resource(name = "jsonconvert") + protected Convert jsonConvert; + + @Resource(name = "$_textconvert") protected Convert textConvert; + @Resource(name = "$_binaryconvert") protected Convert binaryConvert; + @Resource(name = "$_sendconvert") + protected Convert sendConvert; + @Resource(name = "$") protected WebSocketNode node; @@ -89,6 +96,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override final void preInit(HttpContext context, AnyValue conf) { + if (this.textConvert == null) this.textConvert = jsonConvert; + if (this.binaryConvert == null) this.binaryConvert = jsonConvert; + if (this.sendConvert == null) this.sendConvert = jsonConvert; InetSocketAddress addr = context.getServerAddress(); if (this.node == null) this.node = createWebSocketNode(); if (this.node == null) { //没有部署SNCP,即不是分布式 @@ -96,7 +106,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service - this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, logger); + this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, this.sendConvert, logger); this.node.init(conf); this.node.localEngine.init(conf); } @@ -132,6 +142,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._messageTextType = this.messageTextType; webSocket._textConvert = textConvert; webSocket._binaryConvert = binaryConvert; + webSocket._sendConvert = sendConvert; webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddr = request.getRemoteAddr(); initRestWebSocket(webSocket);