From 84b4eee7b5a731afe39c21edb258f3736387f8a2 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 16 Oct 2017 10:48:14 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E8=BD=BDWebSocket=E7=9A=84sendMessage?= =?UTF-8?q?=E5=92=8CbroadcastMessage=E7=B3=BB=E5=88=97=E6=96=B9=E6=B3=95?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0Convert=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/convert/BinaryConvert.java | 34 ++++++++++ src/org/redkale/convert/TextConvert.java | 35 +++++++++++ src/org/redkale/convert/bson/BsonConvert.java | 9 +-- src/org/redkale/convert/json/JsonConvert.java | 9 +-- src/org/redkale/net/http/WebSocket.java | 62 +++++++++++++++++-- src/org/redkale/net/http/WebSocketEngine.java | 2 + src/org/redkale/net/http/WebSocketNode.java | 61 +++++++++++++++++- 7 files changed, 193 insertions(+), 19 deletions(-) create mode 100644 src/org/redkale/convert/BinaryConvert.java create mode 100644 src/org/redkale/convert/TextConvert.java diff --git a/src/org/redkale/convert/BinaryConvert.java b/src/org/redkale/convert/BinaryConvert.java new file mode 100644 index 000000000..0cb388068 --- /dev/null +++ b/src/org/redkale/convert/BinaryConvert.java @@ -0,0 +1,34 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.convert; + +import java.lang.reflect.Type; + +/** + * 二进制序列化/反序列化操作类 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param Reader输入的子类 + * @param Writer输出的子类 + */ +public abstract class BinaryConvert extends Convert { + + protected BinaryConvert(ConvertFactory factory) { + super(factory); + } + + @Override + public final boolean isBinary() { + return true; + } + + public abstract byte[] convertTo(final Object value); + + public abstract byte[] convertTo(final Type type, final Object value); +} diff --git a/src/org/redkale/convert/TextConvert.java b/src/org/redkale/convert/TextConvert.java new file mode 100644 index 000000000..c6f3b095a --- /dev/null +++ b/src/org/redkale/convert/TextConvert.java @@ -0,0 +1,35 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.convert; + +import java.lang.reflect.Type; + +/** + * 文本序列化/反序列化操作类 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param Reader输入的子类 + * @param Writer输出的子类 + */ +public abstract class TextConvert extends Convert { + + protected TextConvert(ConvertFactory factory) { + super(factory); + } + + @Override + public final boolean isBinary() { + return false; + } + + public abstract String convertTo(final Object value); + + public abstract String convertTo(final Type type, final Object value); + +} diff --git a/src/org/redkale/convert/bson/BsonConvert.java b/src/org/redkale/convert/bson/BsonConvert.java index 141ff27f7..002b09637 100644 --- a/src/org/redkale/convert/bson/BsonConvert.java +++ b/src/org/redkale/convert/bson/BsonConvert.java @@ -37,7 +37,7 @@ import org.redkale.util.*; * * @author zhangjx */ -public final class BsonConvert extends Convert { +public final class BsonConvert extends BinaryConvert { private static final ObjectPool readerPool = BsonReader.createPool(Integer.getInteger("convert.bson.pool.size", 16)); @@ -59,11 +59,6 @@ public final class BsonConvert extends Convert { return BsonFactory.root().getConvert(); } - @Override - public boolean isBinary() { - return true; - } - //------------------------------ reader ----------------------------------------------------------- public BsonReader pollBsonReader(final ByteBuffer... buffers) { return new BsonByteBufferReader((ConvertMask) null, buffers); @@ -144,6 +139,7 @@ public final class BsonConvert extends Convert { } //------------------------------ convertTo ----------------------------------------------------------- + @Override public byte[] convertTo(final Object value) { if (value == null) { final BsonWriter out = writerPool.get().tiny(tiny); @@ -155,6 +151,7 @@ public final class BsonConvert extends Convert { return convertTo(value.getClass(), value); } + @Override public byte[] convertTo(final Type type, final Object value) { if (type == null) return null; final BsonWriter out = writerPool.get().tiny(tiny); diff --git a/src/org/redkale/convert/json/JsonConvert.java b/src/org/redkale/convert/json/JsonConvert.java index c2474a3d3..307f91334 100644 --- a/src/org/redkale/convert/json/JsonConvert.java +++ b/src/org/redkale/convert/json/JsonConvert.java @@ -21,7 +21,7 @@ import org.redkale.util.*; * @author zhangjx */ @SuppressWarnings("unchecked") -public final class JsonConvert extends Convert { +public final class JsonConvert extends TextConvert { public static final Type TYPE_MAP_STRING_STRING = new TypeToken>() { }.getType(); @@ -46,11 +46,6 @@ public final class JsonConvert extends Convert { return JsonFactory.root().getConvert(); } - @Override - public boolean isBinary() { - return false; - } - //------------------------------ reader ----------------------------------------------------------- public JsonReader pollJsonReader(final ByteBuffer... buffers) { return new JsonByteBufferReader((ConvertMask) null, buffers); @@ -134,11 +129,13 @@ public final class JsonConvert extends Convert { } //------------------------------ convertTo ----------------------------------------------------------- + @Override public String convertTo(final Object value) { if (value == null) return "null"; return convertTo(value.getClass(), value); } + @Override public String convertTo(final Type type, final Object value) { if (type == null) return null; if (value == null) return "null"; diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 73bf073ce..847a4e3cd 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -204,6 +204,19 @@ public abstract class WebSocket { return sendMessage(message, true, userids); } + /** + * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 + * + * @param convert Convert + * @param message 不可为空 + * @param userids Serializable[] + * + * @return 为0表示成功, 其他值表示异常 + */ + public final CompletableFuture sendMessage(final Convert convert, Object message, G... userids) { + return sendMessage(convert, message, true, userids); + } + /** * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * @@ -214,11 +227,25 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示异常 */ public final CompletableFuture sendMessage(Object message, boolean last, G... userids) { + return sendMessage((Convert) null, message, last, userids); + } + + /** + * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 + * + * @param convert Convert + * @param message 不可为空 + * @param last 是否最后一条 + * @param userids Serializable[] + * + * @return 为0表示成功, 其他值表示异常 + */ + public final CompletableFuture sendMessage(final Convert convert, Object message, boolean last, G... userids) { if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(json, last, userids)); + return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(convert, json, last, userids)); } - CompletableFuture rs = _engine.node.sendMessage(message, last, userids); + CompletableFuture rs = _engine.node.sendMessage(convert, message, last, userids); if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")"); return rs; } @@ -231,7 +258,19 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture broadcastMessage(final Object message) { - return broadcastMessage(message, true); + return broadcastMessage((Convert) null, message, true); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param convert Convert + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final Convert convert, final Object message) { + return broadcastMessage(convert, message, true); } /** @@ -243,11 +282,24 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture broadcastMessage(final Object message, final boolean last) { + return broadcastMessage((Convert) null, message, last); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param convert Convert + * @param message 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final Convert convert, final Object message, final boolean last) { if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(json, last)); + return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(convert, json, last)); } - CompletableFuture rs = _engine.node.broadcastMessage(message, last); + CompletableFuture rs = _engine.node.broadcastMessage(convert, message, last); if (_engine.finest) _engine.logger.finest("broadcast send websocket message(" + message + ")"); return rs; } diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index b4a88d219..322b8a512 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -144,6 +144,7 @@ public class WebSocketEngine { } final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null); if (more) { + //此处的WebSocketPacket只能是包含payload或bytes内容的,不能包含sendConvert、sendJson、sendBuffers final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last)); @@ -190,6 +191,7 @@ public class WebSocketEngine { } final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1; if (more) { + //此处的WebSocketPacket只能是包含payload或bytes内容的,不能包含sendConvert、sendJson、sendBuffers final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last)); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 839660e47..dd1177b44 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -13,6 +13,7 @@ import java.util.concurrent.*; import java.util.logging.*; import javax.annotation.*; import org.redkale.boot.*; +import org.redkale.convert.*; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -198,7 +199,21 @@ public abstract class WebSocketNode { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture sendMessage(Object message, final Serializable... userids) { - return sendMessage(message, true, userids); + return sendMessage((Convert) null, message, true, userids); + } + + /** + * 向指定用户发送消息,先发送本地连接,再发送远程连接
+ * 如果当前WebSocketNode是远程模式,此方法只发送远程连接 + * + * @param convert Convert + * @param message 消息内容 + * @param userids Serializable[] + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture sendMessage(final Convert convert, Object message, final Serializable... userids) { + return sendMessage(convert, message, true, userids); } /** @@ -212,7 +227,23 @@ public abstract class WebSocketNode { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture sendMessage(final Object message, final boolean last, final Serializable... userids) { + return sendMessage((Convert) null, message, last, userids); + } + + /** + * 向指定用户发送消息,先发送本地连接,再发送远程连接
+ * 如果当前WebSocketNode是远程模式,此方法只发送远程连接 + * + * @param convert Convert + * @param message0 消息内容 + * @param last 是否最后一条 + * @param userids Serializable[] + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture sendMessage(final Convert convert, final Object message0, final boolean last, final Serializable... userids) { if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 return this.localEngine.sendMessage(message, last, userids); } @@ -232,7 +263,19 @@ public abstract class WebSocketNode { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture broadcastMessage(final Object message) { - return broadcastMessage(message, true); + return broadcastMessage((Convert) null, message, true); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param convert Convert + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final Convert convert, final Object message) { + return broadcastMessage(convert, message, true); } /** @@ -244,6 +287,20 @@ public abstract class WebSocketNode { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture broadcastMessage(final Object message, final boolean last) { + return broadcastMessage((Convert) null, message, last); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param convert Convert + * @param message0 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final Convert convert, final Object message0, final boolean last) { + final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 return this.localEngine.broadcastMessage(message, last); }