From 59b08684a80c98bc9fdff47b90bfbbdc4cc8dfd4 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Thu, 25 May 2017 23:51:27 +0800 Subject: [PATCH] --- src/org/redkale/net/http/Rest.java | 3 ++ src/org/redkale/net/http/RestWebSocket.java | 2 +- src/org/redkale/net/http/WebSocketGroup.java | 2 +- src/org/redkale/net/http/WebSocketPacket.java | 31 +++++++++++++++++-- src/org/redkale/net/http/WebSocketRunner.java | 10 +++--- 5 files changed, 38 insertions(+), 10 deletions(-) diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index 294f753fd..2499fa745 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -130,6 +130,9 @@ public final class Rest { if (field.getAnnotation(Resource.class) == null) continue; if (Modifier.isStatic(webSocketType.getModifiers())) throw new RuntimeException(field + " cannot static on createRestWebSocketServlet"); if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException(field + " cannot final on createRestWebSocketServlet"); + if (!Modifier.isPublic(webSocketType.getModifiers()) && !Modifier.isProtected(webSocketType.getModifiers())) { + throw new RuntimeException(field + " must be public or protected on createRestWebSocketServlet"); + } resourcesFieldSet.add(field); } } while ((clzz = clzz.getSuperclass()) != Object.class); diff --git a/src/org/redkale/net/http/RestWebSocket.java b/src/org/redkale/net/http/RestWebSocket.java index 8782b4bda..d996fdda0 100644 --- a/src/org/redkale/net/http/RestWebSocket.java +++ b/src/org/redkale/net/http/RestWebSocket.java @@ -11,7 +11,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * 只能依附在WebSocket类上,name默认为Service的类名小写并去掉Service字样及后面的字符串 (如HelloWebSocket/HelloWebSocketImpl,的默认路径为 hello)。
- * 注意: 被标记@RestWebSocket的WebSocket不能被修饰为abstract或final,且必须要有一个protected或public的空参数构造函数。
+ * 注意: 被标记@RestWebSocket的WebSocket不能被修饰为abstract或final,且其内部标记为@Resource的字段只能是protected或public,且必须要有一个protected或public的空参数构造函数。
*

* 详情见: https://redkale.org * diff --git a/src/org/redkale/net/http/WebSocketGroup.java b/src/org/redkale/net/http/WebSocketGroup.java index 176e662c9..ab20f3fe2 100644 --- a/src/org/redkale/net/http/WebSocketGroup.java +++ b/src/org/redkale/net/http/WebSocketGroup.java @@ -117,7 +117,7 @@ public final class WebSocketGroup { future = future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b); } } - if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); + if (more && future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); return future == null ? CompletableFuture.completedFuture(0) : future; } diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index 8b5e45bb9..e836a67f9 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -107,10 +107,15 @@ public final class WebSocketPacket { } void setSendBuffers(ByteBuffer[] sendBuffers) { - this.sendBuffers = new ByteBuffer[sendBuffers.length]; - for (int i = 0; i < sendBuffers.length; i++) { - this.sendBuffers[i] = sendBuffers[i].duplicate(); + this.sendBuffers = sendBuffers; + } + + ByteBuffer[] duplicateSendBuffers() { + ByteBuffer[] rs = new ByteBuffer[this.sendBuffers.length]; + for (int i = 0; i < this.sendBuffers.length; i++) { + rs[i] = this.sendBuffers[i].duplicate(); } + return rs; } public WebSocketPacket(byte[] data) { @@ -356,4 +361,24 @@ public final class WebSocketPacket { return this; } + byte[] getReceiveBytes() { + int count = 0; + for (ByteBuffer buf : this.receiveBuffers) { + count += buf.remaining(); + } + byte[] bs = new byte[count]; + int index = 0; + for (ByteBuffer buf : this.receiveBuffers) { + int r = buf.remaining(); + buf.get(bs, index, r); + index += r; + } + ConvertMask mask = this.receiveMasker; + if (mask != null) { + for (int i = 0; i < bs.length; i++) { + bs[i] = mask.unmask(bs[i]); + } + } + return bs; + } } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 35887b5b9..b68390ae7 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -136,7 +136,7 @@ class WebSocketRunner implements Runnable { context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); } } else if (packet.type == FrameType.BINARY) { - byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers); + byte[] message =packet.getReceiveBytes(); if (readBuffer != null) { readBuffer.clear(); channel.read(readBuffer, null, this); @@ -147,7 +147,7 @@ class WebSocketRunner implements Runnable { context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); } } else if (packet.type == FrameType.PONG) { - byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers); + byte[] message = packet.getReceiveBytes(); if (readBuffer != null) { readBuffer.clear(); channel.read(readBuffer, null, this); @@ -158,7 +158,7 @@ class WebSocketRunner implements Runnable { context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e); } } else if (packet.type == FrameType.PING) { - byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers); + byte[] message = packet.getReceiveBytes(); if (readBuffer != null) { readBuffer.clear(); channel.read(readBuffer, null, this); @@ -217,7 +217,7 @@ class WebSocketRunner implements Runnable { queue.add(new QueueEntry(futureResult, packet)); return futureResult; } - ByteBuffer[] buffers = packet.sendBuffers != null ? packet.sendBuffers : packet.encode(this.context.getBufferSupplier()); + ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier()); this.writeBuffers = buffers; try { channel.write(buffers, buffers, new CompletionHandler() { @@ -264,7 +264,7 @@ class WebSocketRunner implements Runnable { QueueEntry entry = queue.poll(); if (entry != null) { future = entry.future; - ByteBuffer[] buffers = packet.sendBuffers != null ? packet.sendBuffers : packet.encode(context.getBufferSupplier()); + ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(context.getBufferSupplier()); writeBuffers = buffers; channel.write(buffers, buffers, this); }