重载WebSocket的sendMessage和broadcastMessage系列方法,增加Convert参数

This commit is contained in:
Redkale
2017-10-16 10:48:14 +08:00
parent 96c0a9bfe4
commit 84b4eee7b5
7 changed files with 193 additions and 19 deletions

View File

@@ -0,0 +1,34 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert;
import java.lang.reflect.Type;
/**
* 二进制序列化/反序列化操作类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类
* @param <W> Writer输出的子类
*/
public abstract class BinaryConvert<R extends Reader, W extends Writer> extends Convert<R, W> {
protected BinaryConvert(ConvertFactory<R, W> factory) {
super(factory);
}
@Override
public final boolean isBinary() {
return true;
}
public abstract byte[] convertTo(final Object value);
public abstract byte[] convertTo(final Type type, final Object value);
}

View File

@@ -0,0 +1,35 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert;
import java.lang.reflect.Type;
/**
* 文本序列化/反序列化操作类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类
* @param <W> Writer输出的子类
*/
public abstract class TextConvert<R extends Reader, W extends Writer> extends Convert<R, W> {
protected TextConvert(ConvertFactory<R, W> factory) {
super(factory);
}
@Override
public final boolean isBinary() {
return false;
}
public abstract String convertTo(final Object value);
public abstract String convertTo(final Type type, final Object value);
}

View File

@@ -37,7 +37,7 @@ import org.redkale.util.*;
* *
* @author zhangjx * @author zhangjx
*/ */
public final class BsonConvert extends Convert<BsonReader, BsonWriter> { public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
private static final ObjectPool<BsonReader> readerPool = BsonReader.createPool(Integer.getInteger("convert.bson.pool.size", 16)); private static final ObjectPool<BsonReader> readerPool = BsonReader.createPool(Integer.getInteger("convert.bson.pool.size", 16));
@@ -59,11 +59,6 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
return BsonFactory.root().getConvert(); return BsonFactory.root().getConvert();
} }
@Override
public boolean isBinary() {
return true;
}
//------------------------------ reader ----------------------------------------------------------- //------------------------------ reader -----------------------------------------------------------
public BsonReader pollBsonReader(final ByteBuffer... buffers) { public BsonReader pollBsonReader(final ByteBuffer... buffers) {
return new BsonByteBufferReader((ConvertMask) null, buffers); return new BsonByteBufferReader((ConvertMask) null, buffers);
@@ -144,6 +139,7 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
} }
//------------------------------ convertTo ----------------------------------------------------------- //------------------------------ convertTo -----------------------------------------------------------
@Override
public byte[] convertTo(final Object value) { public byte[] convertTo(final Object value) {
if (value == null) { if (value == null) {
final BsonWriter out = writerPool.get().tiny(tiny); final BsonWriter out = writerPool.get().tiny(tiny);
@@ -155,6 +151,7 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
return convertTo(value.getClass(), value); return convertTo(value.getClass(), value);
} }
@Override
public byte[] convertTo(final Type type, final Object value) { public byte[] convertTo(final Type type, final Object value) {
if (type == null) return null; if (type == null) return null;
final BsonWriter out = writerPool.get().tiny(tiny); final BsonWriter out = writerPool.get().tiny(tiny);

View File

@@ -21,7 +21,7 @@ import org.redkale.util.*;
* @author zhangjx * @author zhangjx
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final class JsonConvert extends Convert<JsonReader, JsonWriter> { public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
public static final Type TYPE_MAP_STRING_STRING = new TypeToken<java.util.LinkedHashMap<String, String>>() { public static final Type TYPE_MAP_STRING_STRING = new TypeToken<java.util.LinkedHashMap<String, String>>() {
}.getType(); }.getType();
@@ -46,11 +46,6 @@ public final class JsonConvert extends Convert<JsonReader, JsonWriter> {
return JsonFactory.root().getConvert(); return JsonFactory.root().getConvert();
} }
@Override
public boolean isBinary() {
return false;
}
//------------------------------ reader ----------------------------------------------------------- //------------------------------ reader -----------------------------------------------------------
public JsonReader pollJsonReader(final ByteBuffer... buffers) { public JsonReader pollJsonReader(final ByteBuffer... buffers) {
return new JsonByteBufferReader((ConvertMask) null, buffers); return new JsonByteBufferReader((ConvertMask) null, buffers);
@@ -134,11 +129,13 @@ public final class JsonConvert extends Convert<JsonReader, JsonWriter> {
} }
//------------------------------ convertTo ----------------------------------------------------------- //------------------------------ convertTo -----------------------------------------------------------
@Override
public String convertTo(final Object value) { public String convertTo(final Object value) {
if (value == null) return "null"; if (value == null) return "null";
return convertTo(value.getClass(), value); return convertTo(value.getClass(), value);
} }
@Override
public String convertTo(final Type type, final Object value) { public String convertTo(final Type type, final Object value) {
if (type == null) return null; if (type == null) return null;
if (value == null) return "null"; if (value == null) return "null";

View File

@@ -204,6 +204,19 @@ public abstract class WebSocket<G extends Serializable, T> {
return sendMessage(message, true, userids); return sendMessage(message, true, userids);
} }
/**
* 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param convert Convert
* @param message 不可为空
* @param userids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendMessage(final Convert convert, Object message, G... userids) {
return sendMessage(convert, message, true, userids);
}
/** /**
* 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
* *
@@ -214,11 +227,25 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示异常 * @return 为0表示成功 其他值表示异常
*/ */
public final CompletableFuture<Integer> sendMessage(Object message, boolean last, G... userids) { public final CompletableFuture<Integer> sendMessage(Object message, boolean last, G... userids) {
return sendMessage((Convert) null, message, last, userids);
}
/**
* 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param convert Convert
* @param message 不可为空
* @param last 是否最后一条
* @param userids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendMessage(final Convert convert, Object message, boolean last, G... userids) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(json, last, userids)); return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(convert, json, last, userids));
} }
CompletableFuture<Integer> rs = _engine.node.sendMessage(message, last, userids); CompletableFuture<Integer> rs = _engine.node.sendMessage(convert, message, last, userids);
if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")"); if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket message(" + message + ")");
return rs; return rs;
} }
@@ -231,7 +258,19 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> broadcastMessage(final Object message) { public final CompletableFuture<Integer> broadcastMessage(final Object message) {
return broadcastMessage(message, true); return broadcastMessage((Convert) null, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param convert Convert
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final Convert convert, final Object message) {
return broadcastMessage(convert, message, true);
} }
/** /**
@@ -243,11 +282,24 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) { public final CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
return broadcastMessage((Convert) null, message, last);
}
/**
* 广播消息, 给所有人发消息
*
* @param convert Convert
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final Convert convert, final Object message, final boolean last) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(json, last)); return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(convert, json, last));
} }
CompletableFuture<Integer> rs = _engine.node.broadcastMessage(message, last); CompletableFuture<Integer> rs = _engine.node.broadcastMessage(convert, message, last);
if (_engine.finest) _engine.logger.finest("broadcast send websocket message(" + message + ")"); if (_engine.finest) _engine.logger.finest("broadcast send websocket message(" + message + ")");
return rs; return rs;
} }

View File

@@ -144,6 +144,7 @@ public class WebSocketEngine {
} }
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null); final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
if (more) { if (more) {
//此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last));
@@ -190,6 +191,7 @@ public class WebSocketEngine {
} }
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1; final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1;
if (more) { if (more) {
//此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, message, last));

View File

@@ -13,6 +13,7 @@ import java.util.concurrent.*;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.*; import javax.annotation.*;
import org.redkale.boot.*; import org.redkale.boot.*;
import org.redkale.convert.*;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.source.*; import org.redkale.source.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -198,7 +199,21 @@ public abstract class WebSocketNode {
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... userids) { public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... userids) {
return sendMessage(message, true, userids); return sendMessage((Convert) null, message, true, userids);
}
/**
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
* 如果当前WebSocketNode是远程模式此方法只发送远程连接
*
* @param convert Convert
* @param message 消息内容
* @param userids Serializable[]
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> sendMessage(final Convert convert, Object message, final Serializable... userids) {
return sendMessage(convert, message, true, userids);
} }
/** /**
@@ -212,7 +227,23 @@ public abstract class WebSocketNode {
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) { public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
return sendMessage((Convert) null, message, last, userids);
}
/**
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
* 如果当前WebSocketNode是远程模式此方法只发送远程连接
*
* @param convert Convert
* @param message0 消息内容
* @param last 是否最后一条
* @param userids Serializable[]
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> sendMessage(final Convert convert, final Object message0, final boolean last, final Serializable... userids) {
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.sendMessage(message, last, userids); return this.localEngine.sendMessage(message, last, userids);
} }
@@ -232,7 +263,19 @@ public abstract class WebSocketNode {
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> broadcastMessage(final Object message) { public final CompletableFuture<Integer> broadcastMessage(final Object message) {
return broadcastMessage(message, true); return broadcastMessage((Convert) null, message, true);
}
/**
* 广播消息, 给所有人发消息
*
* @param convert Convert
* @param message 消息内容
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final Convert convert, final Object message) {
return broadcastMessage(convert, message, true);
} }
/** /**
@@ -244,6 +287,20 @@ public abstract class WebSocketNode {
* @return 为0表示成功 其他值表示部分发送异常 * @return 为0表示成功 其他值表示部分发送异常
*/ */
public final CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) { public final CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
return broadcastMessage((Convert) null, message, last);
}
/**
* 广播消息, 给所有人发消息
*
* @param convert Convert
* @param message0 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final Convert convert, final Object message0, final boolean last) {
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.broadcastMessage(message, last); return this.localEngine.broadcastMessage(message, last);
} }