From 6769389f3ac7653cd29ac7a657cd9b8b0b79fa2f Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 9 Oct 2024 23:45:56 +0800 Subject: [PATCH] WebSocket --- .../org/redkale/net/http/WebSocketEngine.java | 21 +++++------ .../org/redkale/net/http/WebSocketNode.java | 36 +++++++++---------- .../net/http/WebSocketNodeService.java | 7 ++-- .../org/redkale/net/http/WebSocketPacket.java | 6 ---- .../redkale/test/convert/json/TinyTest.java | 31 ++++++++-------- 5 files changed, 46 insertions(+), 55 deletions(-) diff --git a/src/main/java/org/redkale/net/http/WebSocketEngine.java b/src/main/java/org/redkale/net/http/WebSocketEngine.java index 46e8299e3..9a5772a02 100644 --- a/src/main/java/org/redkale/net/http/WebSocketEngine.java +++ b/src/main/java/org/redkale/net/http/WebSocketEngine.java @@ -5,9 +5,6 @@ */ package org.redkale.net.http; -import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; -import static org.redkale.net.http.WebSocketServlet.*; - import java.io.Serializable; import java.util.*; import java.util.concurrent.*; @@ -18,6 +15,8 @@ import java.util.stream.Stream; import org.redkale.annotation.Comment; import org.redkale.convert.Convert; import org.redkale.net.Cryptor; +import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; +import static org.redkale.net.http.WebSocketServlet.*; import org.redkale.util.AnyValue; /** @@ -262,22 +261,21 @@ public class WebSocketEngine { @Comment("给所有连接用户发送消息") public CompletableFuture broadcastLocalMessage(final Object message, final boolean last) { - return WebSocketEngine.this.broadcastLocalMessage((Predicate) null, message, last); + return broadcastLocalMessage((Predicate) null, message, last); } @Comment("给指定WebSocket连接用户发送消息") public CompletableFuture broadcastLocalMessage( final WebSocketRange wsrange, final Object message, final boolean last) { - Predicate predicate = wsrange == null ? null : (ws) -> ws.predicate(wsrange); - return WebSocketEngine.this.broadcastLocalMessage(predicate, message, last); + Predicate predicate = wsrange == null ? null : ws -> ws.predicate(wsrange); + return broadcastLocalMessage(predicate, message, last); } @Comment("给指定WebSocket连接用户发送消息") public CompletableFuture broadcastLocalMessage( final Predicate predicate, final Object message, final boolean last) { if (message instanceof CompletableFuture) { - return ((CompletableFuture) message) - .thenCompose((json) -> WebSocketEngine.this.broadcastLocalMessage(predicate, json, last)); + return ((CompletableFuture) message).thenCompose(packet -> broadcastLocalMessage(predicate, packet, last)); } // final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers // == null); @@ -361,15 +359,14 @@ public class WebSocketEngine { for (int i = 0; i < array.length; i++) { ss[i] = (Serializable) array[i]; } - return WebSocketEngine.this.sendLocalMessage(message, last, ss); + return sendLocalMessage(message, last, ss); } @Comment("给指定用户组发送消息") public CompletableFuture sendLocalMessage( final Object message, final boolean last, final Serializable... userids) { if (message instanceof CompletableFuture) { - return ((CompletableFuture) message) - .thenCompose((json) -> WebSocketEngine.this.sendLocalMessage(json, last, userids)); + return ((CompletableFuture) message).thenCompose(packet -> sendLocalMessage(packet, last, userids)); } // final boolean more = userids.length > 1; // if (more) { @@ -476,7 +473,7 @@ public class WebSocketEngine { for (int i = 0; i < array.length; i++) { ss[i] = (Serializable) array[i]; } - return WebSocketEngine.this.sendLocalAction(action, ss); + return sendLocalAction(action, ss); } @Comment("给指定用户组发送操作") diff --git a/src/main/java/org/redkale/net/http/WebSocketNode.java b/src/main/java/org/redkale/net/http/WebSocketNode.java index 932836ddb..241f6f0f7 100644 --- a/src/main/java/org/redkale/net/http/WebSocketNode.java +++ b/src/main/java/org/redkale/net/http/WebSocketNode.java @@ -7,6 +7,7 @@ package org.redkale.net.http; import java.io.Serializable; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; import java.util.logging.*; @@ -20,6 +21,7 @@ import org.redkale.convert.json.JsonConvert; import org.redkale.mq.spi.MessageAgent; import org.redkale.net.WorkThread; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; +import org.redkale.net.http.WebSocketPacket.FrameType; import org.redkale.net.sncp.Sncp; import org.redkale.service.*; import org.redkale.source.CacheSource; @@ -131,7 +133,7 @@ public abstract class WebSocketNode implements Service { protected abstract CompletableFuture sendMessage( @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, - Object message, + WebSocketPacket message, boolean last, Serializable... userids); @@ -139,7 +141,7 @@ public abstract class WebSocketNode implements Service { @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, - Object message, + WebSocketPacket message, boolean last); protected abstract CompletableFuture sendAction( @@ -675,16 +677,12 @@ public abstract class WebSocketNode implements Service { final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) - ? new WebSocketPacket( - WebSocketPacket.FrameType.TEXT, ((TextConvert) convert).convertToBytes(message0), last) - : new WebSocketPacket( - WebSocketPacket.FrameType.BINARY, - ((BinaryConvert) convert).convertToBytes(message0), - last)); + ? new WebSocketPacket(FrameType.TEXT, convert.convertToBytes(message0), last) + : new WebSocketPacket(FrameType.BINARY, convert.convertToBytes(message0), last)); if (this.localEngine != null && this.source == null) { // 本地模式且没有分布式 return this.localEngine.sendLocalMessage(message, last, userids); } - final Object remoteMessage = formatRemoteMessage(message); + final WebSocketPacket remoteMessage = formatRemoteMessage(message); CompletableFuture rsfuture; if (userids.length == 1) { rsfuture = sendOneUserMessage(remoteMessage, last, userids[0]); @@ -809,7 +807,7 @@ public abstract class WebSocketNode implements Service { return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; } // 远程节点发送消息 - final Object remoteMessage = formatRemoteMessage(message); + final WebSocketPacket remoteMessage = formatRemoteMessage(message); tryAcquireSemaphore(); CompletableFuture> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class); @@ -872,7 +870,7 @@ public abstract class WebSocketNode implements Service { // 没有CacheSource就不会有分布式节点 return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } - final Object remoteMessage = formatRemoteMessage(message); + final WebSocketPacket remoteMessage = formatRemoteMessage(message); return remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userids); } @@ -1018,7 +1016,7 @@ public abstract class WebSocketNode implements Service { if (this.localEngine != null && this.source == null) { // 本地模式且没有分布式 return this.localEngine.broadcastLocalMessage(wsrange, message, last); } - final Object remoteMessage = formatRemoteMessage(message); + final WebSocketPacket remoteMessage = formatRemoteMessage(message); CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last); tryAcquireSemaphore(); @@ -1274,23 +1272,23 @@ public abstract class WebSocketNode implements Service { return remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userids); } - protected Object formatRemoteMessage(Object message) { + protected WebSocketPacket formatRemoteMessage(Object message) { if (message instanceof WebSocketPacket) { - return message; + return (WebSocketPacket) message; } if (message instanceof byte[]) { - return message; + return new WebSocketPacket(FrameType.BINARY, (byte[]) message); } if (message instanceof CharSequence) { - return message; + return new WebSocketPacket(FrameType.TEXT, message.toString().getBytes(StandardCharsets.UTF_8)); } if (sendConvert instanceof TextConvert) { - ((TextConvert) sendConvert).convertTo(message); + return new WebSocketPacket(FrameType.TEXT, ((TextConvert) sendConvert).convertToBytes(message)); } if (sendConvert instanceof BinaryConvert) { - ((BinaryConvert) sendConvert).convertTo(message); + return new WebSocketPacket(FrameType.BINARY, ((BinaryConvert) sendConvert).convertToBytes(message)); } - return JsonConvert.root().convertTo(message); + return new WebSocketPacket(FrameType.TEXT, JsonConvert.root().convertToBytes(message)); } protected boolean tryAcquireSemaphore() { diff --git a/src/main/java/org/redkale/net/http/WebSocketNodeService.java b/src/main/java/org/redkale/net/http/WebSocketNodeService.java index 2dca3e1a2..4a2b6135b 100644 --- a/src/main/java/org/redkale/net/http/WebSocketNodeService.java +++ b/src/main/java/org/redkale/net/http/WebSocketNodeService.java @@ -1,13 +1,12 @@ package org.redkale.net.http; -import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; - import java.io.Serializable; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.logging.Level; import org.redkale.annotation.*; +import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import org.redkale.service.RpcTargetAddress; import org.redkale.service.RpcTargetTopic; import org.redkale.service.Service; @@ -57,7 +56,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { public CompletableFuture sendMessage( @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, - Object message, + WebSocketPacket message, boolean last, Serializable... userids) { if (this.localEngine == null) { @@ -71,7 +70,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, - Object message, + WebSocketPacket message, boolean last) { if (this.localEngine == null) { return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); diff --git a/src/main/java/org/redkale/net/http/WebSocketPacket.java b/src/main/java/org/redkale/net/http/WebSocketPacket.java index 3a3636bcc..b3deed192 100644 --- a/src/main/java/org/redkale/net/http/WebSocketPacket.java +++ b/src/main/java/org/redkale/net/http/WebSocketPacket.java @@ -22,12 +22,6 @@ public final class WebSocketPacket { public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); - public enum MessageType { - STRING, - BYTES, - OBJECT; - } - public enum FrameType { SERIES(0x00), TEXT(0x01), diff --git a/src/test/java/org/redkale/test/convert/json/TinyTest.java b/src/test/java/org/redkale/test/convert/json/TinyTest.java index 04f014576..5e0a2be33 100644 --- a/src/test/java/org/redkale/test/convert/json/TinyTest.java +++ b/src/test/java/org/redkale/test/convert/json/TinyTest.java @@ -16,30 +16,33 @@ public class TinyTest { TinyTest test = new TinyTest(); test.run1(); test.run2(); + test.run3(); } @Test public void run1() throws Exception { TinyRecord record = new TinyRecord(); record.id = 5; - { - JsonFactory factory = JsonFactory.create().withFeatures(Convert.FEATURE_TINY); - JsonConvert convert = factory.getConvert(); - String json = "{\"id\":5}"; - Assertions.assertEquals(json, convert.convertTo(record)); - System.out.println(convert.convertTo(record)); - } - { - JsonFactory factory = JsonFactory.create().withFeatures(0); - JsonConvert convert = factory.getConvert(); - String json = "{\"id\":5,\"name\":\"\"}"; - Assertions.assertEquals(json, convert.convertTo(record)); - System.out.println(convert.convertTo(record)); - } + JsonFactory factory = JsonFactory.create().withFeatures(Convert.FEATURE_TINY); + JsonConvert convert = factory.getConvert(); + String json = "{\"id\":5}"; + Assertions.assertEquals(json, convert.convertTo(record)); + System.out.println(convert.convertTo(record)); } @Test public void run2() throws Exception { + TinyRecord record = new TinyRecord(); + record.id = 5; + JsonFactory factory = JsonFactory.create().withFeatures(0); + JsonConvert convert = factory.getConvert(); + String json = "{\"id\":5,\"name\":\"\"}"; + Assertions.assertEquals(json, convert.convertTo(record)); + System.out.println(convert.convertTo(record)); + } + + @Test + public void run3() throws Exception { String json = "{\"id\":5,\"name\":\"\", \"status\":2}"; JsonConvert.root().convertFrom(TinyRecord.class, json); }