This commit is contained in:
Redkale
2017-03-31 08:11:46 +08:00
parent a57574dd10
commit b27bbb7836
6 changed files with 132 additions and 25 deletions

View File

@@ -10,6 +10,7 @@ import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*;
import org.redkale.util.Comment;
@@ -76,6 +77,8 @@ public abstract class WebSocket {
String _remoteAddr;//不可能为空
JsonConvert _jsonConvert; //不可能为空
private final long createtime = System.currentTimeMillis();
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
@@ -162,15 +165,30 @@ public abstract class WebSocket {
}
/**
* 给自身发送消息, 消息类型是String或byte[]
* 给自身发送消息, 消息类型是String或byte[]或可JSON化对象
*
* @param message 不可为空, 只能是String或byte[]
* @param message 不可为空, 只能是String或byte[]或可JSON化对象
*
* @return 0表示成功 非0表示错误码
*/
public final int send(Object message) {
return send(message, true);
}
/**
* 给自身发送消息, 消息类型是String或byte[]或可JSON化对象
*
* @param message 不可为空, 只能是String或byte[]或可JSON化对象
* @param last 是否最后一条
*
* @return 0表示成功 非0表示错误码
*/
public final int send(Serializable message, boolean last) {
return send(new WebSocketPacket(message, last));
public final int send(Object message, boolean last) {
if (message == null || message instanceof CharSequence || message instanceof byte[]) {
return send(new WebSocketPacket((Serializable) message, last));
} else {
return send(new WebSocketPacket(_jsonConvert.convertTo(message), last));
}
}
//----------------------------------------------------------------
@@ -195,7 +213,19 @@ public abstract class WebSocket {
* @return 为0表示成功 其他值表示异常
*/
public final int sendEachMessage(Serializable groupid, byte[] data) {
return WebSocket.this.sendEachMessage(groupid, data, true);
return sendEachMessage(groupid, data, true);
}
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送可JSON化对象消息
*
* @param groupid groupid
* @param message 不可为空
*
* @return 为0表示成功 其他值表示异常
*/
public final int sendEachMessage(Serializable groupid, Object message) {
return sendEachMessage(groupid, message, true);
}
/**
@@ -224,6 +254,19 @@ public abstract class WebSocket {
return sendMessage(groupid, false, data, last);
}
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送可JSON化对象消息
*
* @param groupid groupid
* @param message 不可为空
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
public final int sendEachMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, false, message, last);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息
*
@@ -248,6 +291,18 @@ public abstract class WebSocket {
return sendRecentMessage(groupid, data, true);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送可JSON化对象消息
*
* @param groupid groupid
* @param message 不可为空
*
* @return 为0表示成功 其他值表示异常
*/
public final int sendRecentMessage(Serializable groupid, Object message) {
return sendMessage(groupid, true, message, true);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息
*
@@ -274,6 +329,19 @@ public abstract class WebSocket {
return sendMessage(groupid, true, data, last);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送可JSON化对象消息
*
* @param groupid groupid
* @param message 不可为空
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
public final int sendRecentMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, true, message, last);
}
private int sendMessage(Serializable groupid, boolean recent, String text, boolean last) {
if (_engine.node == null) return RETCODE_NODESERVICE_NULL;
int rs = _engine.node.sendMessage(groupid, recent, text, last);
@@ -288,6 +356,13 @@ public abstract class WebSocket {
return rs;
}
private int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) {
if (_engine.node == null) return RETCODE_NODESERVICE_NULL;
int rs = _engine.node.sendMessage(groupid, recent, message, last);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")");
return rs;
}
/**
* 获取指定groupid在线用户的节点地址列表
*

View File

@@ -15,7 +15,9 @@ import org.redkale.util.*;
/**
*
* <p> 详情见: https://redkale.org
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public final class WebSocketEngine {

View File

@@ -83,7 +83,7 @@ public final class WebSocketGroup {
attributes.put(name, value);
}
public final int send(boolean recent, Serializable message, boolean last) {
public final int send(boolean recent, Object message, boolean last) {
if (recent) {
return recentWebSocket.send(message, last);
} else {
@@ -91,7 +91,7 @@ public final class WebSocketGroup {
}
}
public final int sendEach(Serializable message) {
public final int sendEach(Object message) {
return sendEach(message, true);
}
@@ -111,7 +111,7 @@ public final class WebSocketGroup {
return rs;
}
public final int sendRecent(Serializable message) {
public final int sendRecent(Object message) {
return sendRecent(message, true);
}
@@ -119,7 +119,10 @@ public final class WebSocketGroup {
return recentWebSocket.send(packet);
}
public final int sendEach(Serializable message, boolean last) {
public final int sendEach(Object message, boolean last) {
if (message != null && !(message instanceof byte[]) && !(message instanceof CharSequence)) {
message = recentWebSocket._jsonConvert.convertTo(message);
}
int rs = 0;
for (WebSocket s : list) {
rs |= s.send(message, last);
@@ -127,7 +130,7 @@ public final class WebSocketGroup {
return rs;
}
public final int sendRecent(Serializable message, boolean last) {
public final int sendRecent(Object message, boolean last) {
return recentWebSocket.send(message, last);
}

View File

@@ -60,7 +60,7 @@ public abstract class WebSocketNode {
protected abstract List<String> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid);
protected abstract int sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Serializable message, boolean last);
protected abstract int sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last);
protected abstract void connect(Serializable groupid, InetSocketAddress addr);
@@ -133,7 +133,7 @@ public abstract class WebSocketNode {
engines.put(engine.getEngineid(), engine);
}
public final int sendMessage(Serializable groupid, boolean recent, Serializable message, boolean last) {
public final int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) {
final Set<String> engineids = localNodes.get(groupid);
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids);
int rscode = RETCODE_GROUP_EMPTY;
@@ -187,44 +187,44 @@ public abstract class WebSocketNode {
//--------------------------------------------------------------------------------
public final int sendEachMessage(Serializable groupid, String text) {
return sendMessage(groupid, false, text);
return sendMessage(groupid, false, (Object) text, true);
}
public final int sendEachMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, false, text, last);
return sendMessage(groupid, false, (Object) text, last);
}
public final int sendRecentMessage(Serializable groupid, String text) {
return sendMessage(groupid, true, text);
return sendMessage(groupid, true, (Object) text, true);
}
public final int sendRecentMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, true, text, last);
return sendMessage(groupid, true, (Object) text, last);
}
public final int sendMessage(Serializable groupid, boolean recent, String text) {
return sendMessage(groupid, recent, text, true);
return sendMessage(groupid, recent, (Object) text, true);
}
public final int sendMessage(Serializable groupid, boolean recent, String text, boolean last) {
return sendMessage(groupid, recent, (Serializable) text, last);
return sendMessage(groupid, recent, (Object) text, last);
}
//--------------------------------------------------------------------------------
public final int sendEachMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, false, data);
return sendMessage(groupid, false, (Object) data, true);
}
public final int sendEachMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, false, data, last);
return sendMessage(groupid, false, (Object) data, last);
}
public final int sendRecentMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, true, data);
return sendMessage(groupid, true, (Object) data, true);
}
public final int sendRecentMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, true, data, last);
return sendMessage(groupid, true, (Object) data, last);
}
public final int sendMessage(Serializable groupid, boolean recent, byte[] data) {
@@ -232,6 +232,28 @@ public abstract class WebSocketNode {
}
public final int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) {
return sendMessage(groupid, recent, (Serializable) data, last);
return sendMessage(groupid, recent, (Object) data, last);
}
//--------------------------------------------------------------------------------
public final int sendEachMessage(Serializable groupid, Object message) {
return sendMessage(groupid, false, message, true);
}
public final int sendEachMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, false, message, last);
}
public final int sendRecentMessage(Serializable groupid, Object message) {
return sendMessage(groupid, true, message, true);
}
public final int sendRecentMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, true, message, last);
}
public final int sendMessage(Serializable groupid, boolean recent, Object message) {
return sendMessage(groupid, recent, message, true);
}
}

View File

@@ -12,6 +12,7 @@ import java.security.*;
import java.util.*;
import java.util.logging.*;
import javax.annotation.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.WebSocketNodeService;
import org.redkale.util.*;
@@ -62,6 +63,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Comment("是否用于二进制流传输")
protected final boolean wsbinary = getClass().getAnnotation(WebSocketBinary.class) != null;
@Resource
protected JsonConvert jsonConvert;
@Resource(name = "$")
protected WebSocketNode node;
@@ -109,6 +113,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}
final WebSocket webSocket = this.createWebSocket();
webSocket._engine = engine;
webSocket._jsonConvert = jsonConvert;
webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr();
Serializable sessionid = webSocket.onOpen(request);

View File

@@ -49,7 +49,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Serializable message, boolean last) {
public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
final Set<String> engineids = localNodes.get(groupid);
if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY;
int code = RETCODE_GROUP_EMPTY;