WebSocket

This commit is contained in:
redkale
2024-10-09 23:45:56 +08:00
parent 1bcc35cbcf
commit 6769389f3a
5 changed files with 46 additions and 55 deletions

View File

@@ -5,9 +5,6 @@
*/ */
package org.redkale.net.http; package org.redkale.net.http;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import static org.redkale.net.http.WebSocketServlet.*;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
@@ -18,6 +15,8 @@ import java.util.stream.Stream;
import org.redkale.annotation.Comment; import org.redkale.annotation.Comment;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.net.Cryptor; import org.redkale.net.Cryptor;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import static org.redkale.net.http.WebSocketServlet.*;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
/** /**
@@ -262,22 +261,21 @@ public class WebSocketEngine {
@Comment("给所有连接用户发送消息") @Comment("给所有连接用户发送消息")
public CompletableFuture<Integer> broadcastLocalMessage(final Object message, final boolean last) { public CompletableFuture<Integer> broadcastLocalMessage(final Object message, final boolean last) {
return WebSocketEngine.this.broadcastLocalMessage((Predicate) null, message, last); return broadcastLocalMessage((Predicate) null, message, last);
} }
@Comment("给指定WebSocket连接用户发送消息") @Comment("给指定WebSocket连接用户发送消息")
public CompletableFuture<Integer> broadcastLocalMessage( public CompletableFuture<Integer> broadcastLocalMessage(
final WebSocketRange wsrange, final Object message, final boolean last) { final WebSocketRange wsrange, final Object message, final boolean last) {
Predicate<WebSocket> predicate = wsrange == null ? null : (ws) -> ws.predicate(wsrange); Predicate<WebSocket> predicate = wsrange == null ? null : ws -> ws.predicate(wsrange);
return WebSocketEngine.this.broadcastLocalMessage(predicate, message, last); return broadcastLocalMessage(predicate, message, last);
} }
@Comment("给指定WebSocket连接用户发送消息") @Comment("给指定WebSocket连接用户发送消息")
public CompletableFuture<Integer> broadcastLocalMessage( public CompletableFuture<Integer> broadcastLocalMessage(
final Predicate<WebSocket> predicate, final Object message, final boolean last) { final Predicate<WebSocket> predicate, final Object message, final boolean last) {
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message) return ((CompletableFuture) message).thenCompose(packet -> broadcastLocalMessage(predicate, packet, last));
.thenCompose((json) -> WebSocketEngine.this.broadcastLocalMessage(predicate, json, last));
} }
// final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers // final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers
// == null); // == null);
@@ -361,15 +359,14 @@ public class WebSocketEngine {
for (int i = 0; i < array.length; i++) { for (int i = 0; i < array.length; i++) {
ss[i] = (Serializable) array[i]; ss[i] = (Serializable) array[i];
} }
return WebSocketEngine.this.sendLocalMessage(message, last, ss); return sendLocalMessage(message, last, ss);
} }
@Comment("给指定用户组发送消息") @Comment("给指定用户组发送消息")
public CompletableFuture<Integer> sendLocalMessage( public CompletableFuture<Integer> sendLocalMessage(
final Object message, final boolean last, final Serializable... userids) { final Object message, final boolean last, final Serializable... userids) {
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message) return ((CompletableFuture) message).thenCompose(packet -> sendLocalMessage(packet, last, userids));
.thenCompose((json) -> WebSocketEngine.this.sendLocalMessage(json, last, userids));
} }
// final boolean more = userids.length > 1; // final boolean more = userids.length > 1;
// if (more) { // if (more) {
@@ -476,7 +473,7 @@ public class WebSocketEngine {
for (int i = 0; i < array.length; i++) { for (int i = 0; i < array.length; i++) {
ss[i] = (Serializable) array[i]; ss[i] = (Serializable) array[i];
} }
return WebSocketEngine.this.sendLocalAction(action, ss); return sendLocalAction(action, ss);
} }
@Comment("给指定用户组发送操作") @Comment("给指定用户组发送操作")

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import java.io.Serializable; import java.io.Serializable;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.logging.*; import java.util.logging.*;
@@ -20,6 +21,7 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.spi.MessageAgent; import org.redkale.mq.spi.MessageAgent;
import org.redkale.net.WorkThread; import org.redkale.net.WorkThread;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import org.redkale.net.http.WebSocketPacket.FrameType;
import org.redkale.net.sncp.Sncp; import org.redkale.net.sncp.Sncp;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.source.CacheSource; import org.redkale.source.CacheSource;
@@ -131,7 +133,7 @@ public abstract class WebSocketNode implements Service {
protected abstract CompletableFuture<Integer> sendMessage( protected abstract CompletableFuture<Integer> sendMessage(
@RpcTargetTopic String topic, @RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, @RpcTargetAddress InetSocketAddress targetAddress,
Object message, WebSocketPacket message,
boolean last, boolean last,
Serializable... userids); Serializable... userids);
@@ -139,7 +141,7 @@ public abstract class WebSocketNode implements Service {
@RpcTargetTopic String topic, @RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, @RpcTargetAddress InetSocketAddress targetAddress,
WebSocketRange wsrange, WebSocketRange wsrange,
Object message, WebSocketPacket message,
boolean last); boolean last);
protected abstract CompletableFuture<Integer> sendAction( protected abstract CompletableFuture<Integer> sendAction(
@@ -675,16 +677,12 @@ public abstract class WebSocketNode implements Service {
final Object message = (convert == null || message0 instanceof WebSocketPacket) final Object message = (convert == null || message0 instanceof WebSocketPacket)
? message0 ? message0
: ((convert instanceof TextConvert) : ((convert instanceof TextConvert)
? new WebSocketPacket( ? new WebSocketPacket(FrameType.TEXT, convert.convertToBytes(message0), last)
WebSocketPacket.FrameType.TEXT, ((TextConvert) convert).convertToBytes(message0), last) : new WebSocketPacket(FrameType.BINARY, convert.convertToBytes(message0), last));
: new WebSocketPacket(
WebSocketPacket.FrameType.BINARY,
((BinaryConvert) convert).convertToBytes(message0),
last));
if (this.localEngine != null && this.source == null) { // 本地模式且没有分布式 if (this.localEngine != null && this.source == null) { // 本地模式且没有分布式
return this.localEngine.sendLocalMessage(message, last, userids); return this.localEngine.sendLocalMessage(message, last, userids);
} }
final Object remoteMessage = formatRemoteMessage(message); final WebSocketPacket remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> rsfuture; CompletableFuture<Integer> rsfuture;
if (userids.length == 1) { if (userids.length == 1) {
rsfuture = sendOneUserMessage(remoteMessage, last, userids[0]); rsfuture = sendOneUserMessage(remoteMessage, last, userids[0]);
@@ -809,7 +807,7 @@ public abstract class WebSocketNode implements Service {
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
} }
// 远程节点发送消息 // 远程节点发送消息
final Object remoteMessage = formatRemoteMessage(message); final WebSocketPacket remoteMessage = formatRemoteMessage(message);
tryAcquireSemaphore(); tryAcquireSemaphore();
CompletableFuture<Set<WebSocketAddress>> addrsFuture = CompletableFuture<Set<WebSocketAddress>> addrsFuture =
source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class); source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
@@ -872,7 +870,7 @@ public abstract class WebSocketNode implements Service {
// 没有CacheSource就不会有分布式节点 // 没有CacheSource就不会有分布式节点
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
} }
final Object remoteMessage = formatRemoteMessage(message); final WebSocketPacket remoteMessage = formatRemoteMessage(message);
return remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userids); return remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userids);
} }
@@ -1018,7 +1016,7 @@ public abstract class WebSocketNode implements Service {
if (this.localEngine != null && this.source == null) { // 本地模式且没有分布式 if (this.localEngine != null && this.source == null) { // 本地模式且没有分布式
return this.localEngine.broadcastLocalMessage(wsrange, message, last); return this.localEngine.broadcastLocalMessage(wsrange, message, last);
} }
final Object remoteMessage = formatRemoteMessage(message); final WebSocketPacket remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> localFuture = CompletableFuture<Integer> localFuture =
this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last); this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last);
tryAcquireSemaphore(); tryAcquireSemaphore();
@@ -1274,23 +1272,23 @@ public abstract class WebSocketNode implements Service {
return remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userids); return remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userids);
} }
protected Object formatRemoteMessage(Object message) { protected WebSocketPacket formatRemoteMessage(Object message) {
if (message instanceof WebSocketPacket) { if (message instanceof WebSocketPacket) {
return message; return (WebSocketPacket) message;
} }
if (message instanceof byte[]) { if (message instanceof byte[]) {
return message; return new WebSocketPacket(FrameType.BINARY, (byte[]) message);
} }
if (message instanceof CharSequence) { if (message instanceof CharSequence) {
return message; return new WebSocketPacket(FrameType.TEXT, message.toString().getBytes(StandardCharsets.UTF_8));
} }
if (sendConvert instanceof TextConvert) { if (sendConvert instanceof TextConvert) {
((TextConvert) sendConvert).convertTo(message); return new WebSocketPacket(FrameType.TEXT, ((TextConvert) sendConvert).convertToBytes(message));
} }
if (sendConvert instanceof BinaryConvert) { if (sendConvert instanceof BinaryConvert) {
((BinaryConvert) sendConvert).convertTo(message); return new WebSocketPacket(FrameType.BINARY, ((BinaryConvert) sendConvert).convertToBytes(message));
} }
return JsonConvert.root().convertTo(message); return new WebSocketPacket(FrameType.TEXT, JsonConvert.root().convertToBytes(message));
} }
protected boolean tryAcquireSemaphore() { protected boolean tryAcquireSemaphore() {

View File

@@ -1,13 +1,12 @@
package org.redkale.net.http; package org.redkale.net.http;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import java.io.Serializable; import java.io.Serializable;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.annotation.*; import org.redkale.annotation.*;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import org.redkale.service.RpcTargetAddress; import org.redkale.service.RpcTargetAddress;
import org.redkale.service.RpcTargetTopic; import org.redkale.service.RpcTargetTopic;
import org.redkale.service.Service; import org.redkale.service.Service;
@@ -57,7 +56,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
public CompletableFuture<Integer> sendMessage( public CompletableFuture<Integer> sendMessage(
@RpcTargetTopic String topic, @RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, @RpcTargetAddress InetSocketAddress targetAddress,
Object message, WebSocketPacket message,
boolean last, boolean last,
Serializable... userids) { Serializable... userids) {
if (this.localEngine == null) { if (this.localEngine == null) {
@@ -71,7 +70,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@RpcTargetTopic String topic, @RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, @RpcTargetAddress InetSocketAddress targetAddress,
final WebSocketRange wsrange, final WebSocketRange wsrange,
Object message, WebSocketPacket message,
boolean last) { boolean last) {
if (this.localEngine == null) { if (this.localEngine == null) {
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);

View File

@@ -22,12 +22,6 @@ public final class WebSocketPacket {
public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]);
public enum MessageType {
STRING,
BYTES,
OBJECT;
}
public enum FrameType { public enum FrameType {
SERIES(0x00), SERIES(0x00),
TEXT(0x01), TEXT(0x01),

View File

@@ -16,30 +16,33 @@ public class TinyTest {
TinyTest test = new TinyTest(); TinyTest test = new TinyTest();
test.run1(); test.run1();
test.run2(); test.run2();
test.run3();
} }
@Test @Test
public void run1() throws Exception { public void run1() throws Exception {
TinyRecord record = new TinyRecord(); TinyRecord record = new TinyRecord();
record.id = 5; record.id = 5;
{ JsonFactory factory = JsonFactory.create().withFeatures(Convert.FEATURE_TINY);
JsonFactory factory = JsonFactory.create().withFeatures(Convert.FEATURE_TINY); JsonConvert convert = factory.getConvert();
JsonConvert convert = factory.getConvert(); String json = "{\"id\":5}";
String json = "{\"id\":5}"; Assertions.assertEquals(json, convert.convertTo(record));
Assertions.assertEquals(json, convert.convertTo(record)); System.out.println(convert.convertTo(record));
System.out.println(convert.convertTo(record));
}
{
JsonFactory factory = JsonFactory.create().withFeatures(0);
JsonConvert convert = factory.getConvert();
String json = "{\"id\":5,\"name\":\"\"}";
Assertions.assertEquals(json, convert.convertTo(record));
System.out.println(convert.convertTo(record));
}
} }
@Test @Test
public void run2() throws Exception { public void run2() throws Exception {
TinyRecord record = new TinyRecord();
record.id = 5;
JsonFactory factory = JsonFactory.create().withFeatures(0);
JsonConvert convert = factory.getConvert();
String json = "{\"id\":5,\"name\":\"\"}";
Assertions.assertEquals(json, convert.convertTo(record));
System.out.println(convert.convertTo(record));
}
@Test
public void run3() throws Exception {
String json = "{\"id\":5,\"name\":\"\", \"status\":2}"; String json = "{\"id\":5,\"name\":\"\", \"status\":2}";
JsonConvert.root().convertFrom(TinyRecord.class, json); JsonConvert.root().convertFrom(TinyRecord.class, json);
} }