This commit is contained in:
Redkale
2017-05-22 13:05:50 +08:00
parent dee2002cf3
commit 33da94960c
5 changed files with 82 additions and 253 deletions

View File

@@ -94,67 +94,21 @@ public abstract class WebSocket<T> {
//---------------------------------------------------------------- //----------------------------------------------------------------
public final CompletableFuture<Integer> sendPing() { public final CompletableFuture<Integer> sendPing() {
//if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping..."); //if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping...");
return send(WebSocketPacket.DEFAULT_PING_PACKET); return sendPacket(WebSocketPacket.DEFAULT_PING_PACKET);
} }
public final CompletableFuture<Integer> sendPing(byte[] data) { public final CompletableFuture<Integer> sendPing(byte[] data) {
return send(new WebSocketPacket(FrameType.PING, data)); return sendPacket(new WebSocketPacket(FrameType.PING, data));
} }
public final CompletableFuture<Integer> sendPong(byte[] data) { public final CompletableFuture<Integer> sendPong(byte[] data) {
return send(new WebSocketPacket(FrameType.PONG, data)); return sendPacket(new WebSocketPacket(FrameType.PONG, data));
} }
public final long getCreatetime() { public final long getCreatetime() {
return createtime; return createtime;
} }
/**
* 给自身发送单一的文本消息
*
* @param text 不可为空
*
* @return 0表示成功 非0表示错误码
*/
public final CompletableFuture<Integer> send(String text) {
return send(text, true);
}
/**
* 给自身发送文本消息
*
* @param text 不可为空
* @param last 是否最后一条
*
* @return 0表示成功 非0表示错误码
*/
public final CompletableFuture<Integer> send(String text, boolean last) {
return sendPacket(new WebSocketPacket(text, last));
}
/**
* 给自身发送单一的二进制消息
*
* @param data byte[]
*
* @return 0表示成功 非0表示错误码
*/
public final CompletableFuture<Integer> send(byte[] data) {
return send(data, true);
}
/**
* 给自身发送二进制消息
*
* @param data 不可为空
* @param last 是否最后一条
*
* @return 0表示成功 非0表示错误码
*/
public final CompletableFuture<Integer> send(byte[] data, boolean last) {
return sendPacket(new WebSocketPacket(data, last));
}
/** /**
* 给自身发送消息, 消息类型是String或byte[]或可JavaBean对象 * 给自身发送消息, 消息类型是String或byte[]或可JavaBean对象
* *
@@ -215,39 +169,14 @@ public abstract class WebSocket<T> {
* @return 0表示成功 非0表示错误码 * @return 0表示成功 非0表示错误码
*/ */
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) { CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
CompletableFuture<Integer> rs = null; CompletableFuture<Integer> rs = this._runner.sendMessage(packet);
if (this._runner != null) rs = this._runner.sendMessage(packet);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + getGroupid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")"); if (_engine.finest) _engine.logger.finest("wsgroupid:" + getGroupid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")");
return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs; return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs;
} }
//---------------------------------------------------------------- //----------------------------------------------------------------
/** /**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息 * 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param text 不可为空
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, String text) {
return sendEachMessage(groupid, text, true);
}
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息
*
* @param groupid groupid
* @param data 不可为空
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, byte[] data) {
return sendEachMessage(groupid, data, true);
}
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送可JavaBean对象消息
* *
* @param groupid groupid * @param groupid groupid
* @param message 不可为空 * @param message 不可为空
@@ -259,33 +188,7 @@ public abstract class WebSocket<T> {
} }
/** /**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息 * 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param text 不可为空
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, false, text, last);
}
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息
*
* @param groupid groupid
* @param data 不可为空
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, false, data, last);
}
/**
* 给指定groupid的WebSocketGroup下所有WebSocket节点发送可JavaBean对象消息
* *
* @param groupid groupid * @param groupid groupid
* @param message 不可为空 * @param message 不可为空
@@ -298,31 +201,7 @@ public abstract class WebSocket<T> {
} }
/** /**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息 * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param text 不可为空
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, String text) {
return sendRecentMessage(groupid, text, true);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息
*
* @param groupid groupid
* @param data 不可为空
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, byte[] data) {
return sendRecentMessage(groupid, data, true);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送可JavaBean对象消息
* *
* @param groupid groupid * @param groupid groupid
* @param message 不可为空 * @param message 不可为空
@@ -334,33 +213,7 @@ public abstract class WebSocket<T> {
} }
/** /**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息 * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param groupid groupid
* @param text 不可为空
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, true, text, last);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息
*
* @param groupid groupid
* @param data 不可为空
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, true, data, last);
}
/**
* 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送可JavaBean对象消息
* *
* @param groupid groupid * @param groupid groupid
* @param message 不可为空 * @param message 不可为空
@@ -372,21 +225,17 @@ public abstract class WebSocket<T> {
return sendMessage(groupid, true, message, last); return sendMessage(groupid, true, message, last);
} }
private CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, String text, boolean last) { /**
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); * 给指定groupid的WebSocketGroup下WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
CompletableFuture<Integer> rs = _engine.node.sendMessage(groupid, recent, text, last); *
if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + text + ")"); * @param groupid groupid
return rs; * @param recent 是否只发最近接入的WebSocket
} * @param message 不可为空
* @param last 是否最后一条
private CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { *
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); * @return 为0表示成功 其他值表示异常
CompletableFuture<Integer> rs = _engine.node.sendMessage(groupid, recent, data, last); */
if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(byte[" + data.length + "])"); public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message, boolean last) {
return rs;
}
private CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message, boolean last) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
CompletableFuture<Integer> rs = _engine.node.sendMessage(groupid, recent, message, last); CompletableFuture<Integer> rs = _engine.node.sendMessage(groupid, recent, message, last);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")");
@@ -522,30 +371,60 @@ public abstract class WebSocket<T> {
public void onRead(AsyncConnection channel) { public void onRead(AsyncConnection channel) {
} }
/**
* WebSokcet连接成功后的回调方法
*/
public void onConnected() { public void onConnected() {
} }
/**
* ping后的回调方法
*
* @param bytes 数据
*/
public void onPing(byte[] bytes) { public void onPing(byte[] bytes) {
} }
/**
* pong后的回调方法
*
* @param bytes 数据
*/
public void onPong(byte[] bytes) { public void onPong(byte[] bytes) {
} }
public void onMessage(T message) { /**
* 接收到消息的回调方法
*
* @param message 消息
* @param last 是否最后一条
*/
public void onMessage(T message, boolean last) {
} }
public void onMessage(byte[] bytes) { /**
} * 接收到二进制消息的回调方法
*
public void onFragment(T message, boolean last) { * @param bytes 消息
} * @param last 是否最后一条
*/
public void onFragment(byte[] bytes, boolean last) { public void onMessage(byte[] bytes, boolean last) {
} }
/**
* 关闭的回调方法调用此方法时WebSocket已经被关闭
*
* @param code 结果码非0表示非正常关闭
* @param reason 关闭原因
*/
public void onClose(int code, String reason) { public void onClose(int code, String reason) {
} }
/**
* 获取最后一次发送消息的时间
*
* @return long
*/
public long getLastSendTime() { public long getLastSendTime() {
return this._runner == null ? 0 : this._runner.lastSendTime; return this._runner == null ? 0 : this._runner.lastSendTime;
} }

View File

@@ -141,6 +141,27 @@ public abstract class WebSocketNode {
return rs; return rs;
} }
//--------------------------------------------------------------------------------
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message) {
return sendMessage(groupid, false, message, true);
}
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, false, message, last);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message) {
return sendMessage(groupid, true, message, true);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, true, message, last);
}
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message) {
return sendMessage(groupid, recent, message, true);
}
/** /**
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br> * 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
* 如果当前WebSocketNode是远程模式此方法只发送远程连接 * 如果当前WebSocketNode是远程模式此方法只发送远程连接
@@ -192,75 +213,4 @@ public abstract class WebSocketNode {
}); });
} }
//--------------------------------------------------------------------------------
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, String text) {
return sendMessage(groupid, false, (Object) text, true);
}
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, false, (Object) text, last);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, String text) {
return sendMessage(groupid, true, (Object) text, true);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, String text, boolean last) {
return sendMessage(groupid, true, (Object) text, last);
}
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, String text) {
return sendMessage(groupid, recent, (Object) text, true);
}
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, String text, boolean last) {
return sendMessage(groupid, recent, (Object) text, last);
}
//--------------------------------------------------------------------------------
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, false, (Object) data, true);
}
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, false, (Object) data, last);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, byte[] data) {
return sendMessage(groupid, true, (Object) data, true);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, byte[] data, boolean last) {
return sendMessage(groupid, true, (Object) data, last);
}
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, byte[] data) {
return sendMessage(groupid, recent, data, true);
}
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) {
return sendMessage(groupid, recent, (Object) data, last);
}
//--------------------------------------------------------------------------------
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message) {
return sendMessage(groupid, false, message, true);
}
public final CompletableFuture<Integer> sendEachMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, false, message, last);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message) {
return sendMessage(groupid, true, message, true);
}
public final CompletableFuture<Integer> sendRecentMessage(Serializable groupid, Object message, boolean last) {
return sendMessage(groupid, true, message, last);
}
public final CompletableFuture<Integer> sendMessage(Serializable groupid, boolean recent, Object message) {
return sendMessage(groupid, recent, message, true);
}
} }

View File

@@ -122,14 +122,14 @@ public class WebSocketRunner implements Runnable {
readBuffer.clear(); readBuffer.clear();
channel.read(readBuffer, null, this); channel.read(readBuffer, null, this);
} }
webSocket.onMessage(message); webSocket.onMessage(message, packet.last);
} else if (packet.type == FrameType.BINARY) { } else if (packet.type == FrameType.BINARY) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers); byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) { if (readBuffer != null) {
readBuffer.clear(); readBuffer.clear();
channel.read(readBuffer, null, this); channel.read(readBuffer, null, this);
} }
webSocket.onMessage(message); webSocket.onMessage(message, packet.last);
} else if (packet.type == FrameType.PONG) { } else if (packet.type == FrameType.PONG) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers); byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) { if (readBuffer != null) {

View File

@@ -56,7 +56,7 @@ public class ChatWebSocketServlet extends WebSocketServlet {
return new WebSocket<ChatMessage>() { return new WebSocket<ChatMessage>() {
@Override @Override
public void onMessage(ChatMessage message) { public void onMessage(ChatMessage message, boolean last) {
icounter.incrementAndGet(); icounter.incrementAndGet();
counter.incrementAndGet(); counter.incrementAndGet();
if (debug) System.out.println("收到消息: " + message); if (debug) System.out.println("收到消息: " + message);

View File

@@ -87,7 +87,7 @@ public class VideoWebSocketServlet extends WebSocketServlet {
} }
@Override @Override
public void onMessage(Object text) { public void onMessage(Object text, boolean last) {
//System.out.println("接收到消息: " + text); //System.out.println("接收到消息: " + text);
super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> { super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> {
x.send(text); x.send(text);