diff --git a/src/org/redkale/convert/BinaryConvert.java b/src/org/redkale/convert/BinaryConvert.java
new file mode 100644
index 000000000..0cb388068
--- /dev/null
+++ b/src/org/redkale/convert/BinaryConvert.java
@@ -0,0 +1,34 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.redkale.convert;
+
+import java.lang.reflect.Type;
+
+/**
+ * 二进制序列化/反序列化操作类
+ *
+ *
+ * 详情见: https://redkale.org
+ *
+ * @author zhangjx
+ * @param Reader输入的子类
+ * @param Writer输出的子类
+ */
+public abstract class BinaryConvert extends Convert {
+
+ protected BinaryConvert(ConvertFactory factory) {
+ super(factory);
+ }
+
+ @Override
+ public final boolean isBinary() {
+ return true;
+ }
+
+ public abstract byte[] convertTo(final Object value);
+
+ public abstract byte[] convertTo(final Type type, final Object value);
+}
diff --git a/src/org/redkale/convert/TextConvert.java b/src/org/redkale/convert/TextConvert.java
new file mode 100644
index 000000000..c6f3b095a
--- /dev/null
+++ b/src/org/redkale/convert/TextConvert.java
@@ -0,0 +1,35 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.redkale.convert;
+
+import java.lang.reflect.Type;
+
+/**
+ * 文本序列化/反序列化操作类
+ *
+ *
+ * 详情见: https://redkale.org
+ *
+ * @author zhangjx
+ * @param Reader输入的子类
+ * @param Writer输出的子类
+ */
+public abstract class TextConvert extends Convert {
+
+ protected TextConvert(ConvertFactory factory) {
+ super(factory);
+ }
+
+ @Override
+ public final boolean isBinary() {
+ return false;
+ }
+
+ public abstract String convertTo(final Object value);
+
+ public abstract String convertTo(final Type type, final Object value);
+
+}
diff --git a/src/org/redkale/convert/bson/BsonConvert.java b/src/org/redkale/convert/bson/BsonConvert.java
index 141ff27f7..002b09637 100644
--- a/src/org/redkale/convert/bson/BsonConvert.java
+++ b/src/org/redkale/convert/bson/BsonConvert.java
@@ -37,7 +37,7 @@ import org.redkale.util.*;
*
* @author zhangjx
*/
-public final class BsonConvert extends Convert {
+public final class BsonConvert extends BinaryConvert {
private static final ObjectPool readerPool = BsonReader.createPool(Integer.getInteger("convert.bson.pool.size", 16));
@@ -59,11 +59,6 @@ public final class BsonConvert extends Convert {
return BsonFactory.root().getConvert();
}
- @Override
- public boolean isBinary() {
- return true;
- }
-
//------------------------------ reader -----------------------------------------------------------
public BsonReader pollBsonReader(final ByteBuffer... buffers) {
return new BsonByteBufferReader((ConvertMask) null, buffers);
@@ -144,6 +139,7 @@ public final class BsonConvert extends Convert {
}
//------------------------------ convertTo -----------------------------------------------------------
+ @Override
public byte[] convertTo(final Object value) {
if (value == null) {
final BsonWriter out = writerPool.get().tiny(tiny);
@@ -155,6 +151,7 @@ public final class BsonConvert extends Convert {
return convertTo(value.getClass(), value);
}
+ @Override
public byte[] convertTo(final Type type, final Object value) {
if (type == null) return null;
final BsonWriter out = writerPool.get().tiny(tiny);
diff --git a/src/org/redkale/convert/json/JsonConvert.java b/src/org/redkale/convert/json/JsonConvert.java
index c2474a3d3..307f91334 100644
--- a/src/org/redkale/convert/json/JsonConvert.java
+++ b/src/org/redkale/convert/json/JsonConvert.java
@@ -21,7 +21,7 @@ import org.redkale.util.*;
* @author zhangjx
*/
@SuppressWarnings("unchecked")
-public final class JsonConvert extends Convert {
+public final class JsonConvert extends TextConvert {
public static final Type TYPE_MAP_STRING_STRING = new TypeToken>() {
}.getType();
@@ -46,11 +46,6 @@ public final class JsonConvert extends Convert {
return JsonFactory.root().getConvert();
}
- @Override
- public boolean isBinary() {
- return false;
- }
-
//------------------------------ reader -----------------------------------------------------------
public JsonReader pollJsonReader(final ByteBuffer... buffers) {
return new JsonByteBufferReader((ConvertMask) null, buffers);
@@ -134,11 +129,13 @@ public final class JsonConvert extends Convert {
}
//------------------------------ convertTo -----------------------------------------------------------
+ @Override
public String convertTo(final Object value) {
if (value == null) return "null";
return convertTo(value.getClass(), value);
}
+ @Override
public String convertTo(final Type type, final Object value) {
if (type == null) return null;
if (value == null) return "null";
diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java
index 73bf073ce..847a4e3cd 100644
--- a/src/org/redkale/net/http/WebSocket.java
+++ b/src/org/redkale/net/http/WebSocket.java
@@ -204,6 +204,19 @@ public abstract class WebSocket {
return sendMessage(message, true, userids);
}
+ /**
+ * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
+ *
+ * @param convert Convert
+ * @param message 不可为空
+ * @param userids Serializable[]
+ *
+ * @return 为0表示成功, 其他值表示异常
+ */
+ public final CompletableFuture sendMessage(final Convert convert, Object message, G... userids) {
+ return sendMessage(convert, message, true, userids);
+ }
+
/**
* 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
@@ -214,11 +227,25 @@ public abstract class WebSocket {
* @return 为0表示成功, 其他值表示异常
*/
public final CompletableFuture sendMessage(Object message, boolean last, G... userids) {
+ return sendMessage((Convert) null, message, last, userids);
+ }
+
+ /**
+ * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
+ *
+ * @param convert Convert
+ * @param message 不可为空
+ * @param last 是否最后一条
+ * @param userids Serializable[]
+ *
+ * @return 为0表示成功, 其他值表示异常
+ */
+ public final CompletableFuture sendMessage(final Convert convert, Object message, boolean last, G... userids) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (message instanceof CompletableFuture) {
- return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(json, last, userids));
+ return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(convert, json, last, userids));
}
- CompletableFuture rs = _engine.node.sendMessage(message, last, userids);
+ CompletableFuture rs = _engine.node.sendMessage(convert, message, last, userids);
if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")");
return rs;
}
@@ -231,7 +258,19 @@ public abstract class WebSocket {
* @return 为0表示成功, 其他值表示部分发送异常
*/
public final CompletableFuture broadcastMessage(final Object message) {
- return broadcastMessage(message, true);
+ return broadcastMessage((Convert) null, message, true);
+ }
+
+ /**
+ * 广播消息, 给所有人发消息
+ *
+ * @param convert Convert
+ * @param message 消息内容
+ *
+ * @return 为0表示成功, 其他值表示部分发送异常
+ */
+ public final CompletableFuture broadcastMessage(final Convert convert, final Object message) {
+ return broadcastMessage(convert, message, true);
}
/**
@@ -243,11 +282,24 @@ public abstract class WebSocket {
* @return 为0表示成功, 其他值表示部分发送异常
*/
public final CompletableFuture broadcastMessage(final Object message, final boolean last) {
+ return broadcastMessage((Convert) null, message, last);
+ }
+
+ /**
+ * 广播消息, 给所有人发消息
+ *
+ * @param convert Convert
+ * @param message 消息内容
+ * @param last 是否最后一条
+ *
+ * @return 为0表示成功, 其他值表示部分发送异常
+ */
+ public final CompletableFuture broadcastMessage(final Convert convert, final Object message, final boolean last) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (message instanceof CompletableFuture) {
- return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(json, last));
+ return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(convert, json, last));
}
- CompletableFuture rs = _engine.node.broadcastMessage(message, last);
+ CompletableFuture rs = _engine.node.broadcastMessage(convert, message, last);
if (_engine.finest) _engine.logger.finest("broadcast send websocket message(" + message + ")");
return rs;
}
diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java
index b4a88d219..322b8a512 100644
--- a/src/org/redkale/net/http/WebSocketEngine.java
+++ b/src/org/redkale/net/http/WebSocketEngine.java
@@ -144,6 +144,7 @@ public class WebSocketEngine {
}
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
if (more) {
+ //此处的WebSocketPacket只能是包含payload或bytes内容的,不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last));
@@ -190,6 +191,7 @@ public class WebSocketEngine {
}
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1;
if (more) {
+ //此处的WebSocketPacket只能是包含payload或bytes内容的,不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last));
diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java
index 839660e47..dd1177b44 100644
--- a/src/org/redkale/net/http/WebSocketNode.java
+++ b/src/org/redkale/net/http/WebSocketNode.java
@@ -13,6 +13,7 @@ import java.util.concurrent.*;
import java.util.logging.*;
import javax.annotation.*;
import org.redkale.boot.*;
+import org.redkale.convert.*;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
@@ -198,7 +199,21 @@ public abstract class WebSocketNode {
* @return 为0表示成功, 其他值表示部分发送异常
*/
public final CompletableFuture sendMessage(Object message, final Serializable... userids) {
- return sendMessage(message, true, userids);
+ return sendMessage((Convert) null, message, true, userids);
+ }
+
+ /**
+ * 向指定用户发送消息,先发送本地连接,再发送远程连接
+ * 如果当前WebSocketNode是远程模式,此方法只发送远程连接
+ *
+ * @param convert Convert
+ * @param message 消息内容
+ * @param userids Serializable[]
+ *
+ * @return 为0表示成功, 其他值表示部分发送异常
+ */
+ public final CompletableFuture sendMessage(final Convert convert, Object message, final Serializable... userids) {
+ return sendMessage(convert, message, true, userids);
}
/**
@@ -212,7 +227,23 @@ public abstract class WebSocketNode {
* @return 为0表示成功, 其他值表示部分发送异常
*/
public final CompletableFuture sendMessage(final Object message, final boolean last, final Serializable... userids) {
+ return sendMessage((Convert) null, message, last, userids);
+ }
+
+ /**
+ * 向指定用户发送消息,先发送本地连接,再发送远程连接
+ * 如果当前WebSocketNode是远程模式,此方法只发送远程连接
+ *
+ * @param convert Convert
+ * @param message0 消息内容
+ * @param last 是否最后一条
+ * @param userids Serializable[]
+ *
+ * @return 为0表示成功, 其他值表示部分发送异常
+ */
+ public final CompletableFuture sendMessage(final Convert convert, final Object message0, final boolean last, final Serializable... userids) {
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
+ final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.sendMessage(message, last, userids);
}
@@ -232,7 +263,19 @@ public abstract class WebSocketNode {
* @return 为0表示成功, 其他值表示部分发送异常
*/
public final CompletableFuture broadcastMessage(final Object message) {
- return broadcastMessage(message, true);
+ return broadcastMessage((Convert) null, message, true);
+ }
+
+ /**
+ * 广播消息, 给所有人发消息
+ *
+ * @param convert Convert
+ * @param message 消息内容
+ *
+ * @return 为0表示成功, 其他值表示部分发送异常
+ */
+ public final CompletableFuture broadcastMessage(final Convert convert, final Object message) {
+ return broadcastMessage(convert, message, true);
}
/**
@@ -244,6 +287,20 @@ public abstract class WebSocketNode {
* @return 为0表示成功, 其他值表示部分发送异常
*/
public final CompletableFuture broadcastMessage(final Object message, final boolean last) {
+ return broadcastMessage((Convert) null, message, last);
+ }
+
+ /**
+ * 广播消息, 给所有人发消息
+ *
+ * @param convert Convert
+ * @param message0 消息内容
+ * @param last 是否最后一条
+ *
+ * @return 为0表示成功, 其他值表示部分发送异常
+ */
+ public final CompletableFuture broadcastMessage(final Convert convert, final Object message0, final boolean last) {
+ final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.broadcastMessage(message, last);
}