From bf88c4da0602a6aa4da6f62dd143312c2f482ec0 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Wed, 15 Nov 2017 19:51:49 +0800 Subject: [PATCH] --- src/org/redkale/net/http/Rest.java | 4 + src/org/redkale/net/http/RestWebSocket.java | 7 ++ src/org/redkale/net/http/WebSocketEngine.java | 16 ++- src/org/redkale/net/http/WebSocketPacket.java | 103 ++++++++++++++++++ src/org/redkale/net/http/WebSocketRunner.java | 3 + .../redkale/net/http/WebSocketServlet.java | 8 +- 6 files changed, 135 insertions(+), 6 deletions(-) diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index 0c0ad0db8..c1b1d34a8 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -320,6 +320,10 @@ public final class Rest { pushInt(mv, rws.wsmaxconns()); mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxconns", "I"); + mv.visitVarInsn(ALOAD, 0); + pushInt(mv, rws.wsmaxbody()); + mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxbody", "I"); + mv.visitVarInsn(ALOAD, 0); mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0); mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z"); diff --git a/src/org/redkale/net/http/RestWebSocket.java b/src/org/redkale/net/http/RestWebSocket.java index d40bf0834..34a63c76d 100644 --- a/src/org/redkale/net/http/RestWebSocket.java +++ b/src/org/redkale/net/http/RestWebSocket.java @@ -66,6 +66,13 @@ public @interface RestWebSocket { */ int wsmaxconns() default 0; + /** + * 最大消息体长度, 小于1表示无限制 + * + * @return 最大消息体长度 + */ + int wsmaxbody() default 16 * 1024; + /** * 是否屏蔽该类的转换 * diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index add6eb4ed..0c1b6deb6 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -55,7 +55,7 @@ public class WebSocketEngine { private final Map> websockets2 = new ConcurrentHashMap<>(); @Comment("当前连接数") - private final AtomicInteger currconns = new AtomicInteger(); + protected final AtomicInteger currconns = new AtomicInteger(); @Comment("用于PING的定时器") private ScheduledThreadPoolExecutor scheduler; @@ -64,12 +64,16 @@ public class WebSocketEngine { protected final Logger logger; @Comment("PING的间隔秒数") - private int liveinterval; + protected int liveinterval; @Comment("最大连接数, 为0表示无限制") - private int wsmaxconns; + protected int wsmaxconns; - protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int wsmaxconns, WebSocketNode node, Convert sendConvert, Logger logger) { + @Comment("最大消息体长度, 小于1表示无限制") + protected int wsmaxbody; + + protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, + int wsmaxconns, int wsmaxbody, WebSocketNode node, Convert sendConvert, Logger logger) { this.engineid = engineid; this.single = single; this.context = context; @@ -77,6 +81,7 @@ public class WebSocketEngine { this.node = node; this.liveinterval = liveinterval; this.wsmaxconns = wsmaxconns; + this.wsmaxbody = wsmaxbody; this.logger = logger; this.index = sequence.getAndIncrement(); } @@ -86,7 +91,8 @@ public class WebSocketEngine { if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties"); this.liveinterval = props == null ? (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval)); if (liveinterval <= 0) return; - this.wsmaxconns = props == null ? this.wsmaxconns : props.getIntValue(WEBPARAM__WSMAXCONNS, this.wsmaxconns); + if (props != null) this.wsmaxconns = props.getIntValue(WEBPARAM__WSMAXCONNS, this.wsmaxconns); + if (props != null) this.wsmaxbody = props.getIntValue(WEBPARAM__WSMAXBODY, this.wsmaxbody); if (scheduler != null) return; this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { final Thread t = new Thread(r, engineid + "-WebSocket-LiveInterval-Thread"); diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index d680aff87..e899dc4f8 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -290,6 +290,109 @@ public final class WebSocketPacket { // String rs = JsonConvert.root().convertFrom(String.class, masker, buffer); // System.out.println(rs); // } + /** + * 消息解码
+ * + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-------+-+-------------+-------------------------------+ + * |F|R|R|R| opcode|M| Payload len | Extended payload length | + * |I|S|S|S| (4) |A| (7) | (16/64) | + * |N|V|V|V| |S| | (if payload len==126/127) | + * | |1|2|3| |K| | | + * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + + * | Extended payload length continued, if payload len == 127 | + * + - - - - - - - - - - - - - - - +-------------------------------+ + * | |Masking-key, if MASK set to 1 | + * +-------------------------------+-------------------------------+ + * | Masking-key (continued) | Payload Data | + * +-------------------------------- - - - - - - - - - - - - - - - + + * : Payload Data continued : + * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + * | Payload Data continued | + * +-----------------------------------------------------------------------+ + * + * @param buffer + * @param exbuffers + * + * @return 1:表示解析成功且能继续解析;0:表示解析成功; -1:表示解析失败 + */ + /* + static int decode(final Logger logger, final List packets, + final int wsmaxbody, final ByteBuffer buffer, ByteBuffer... exbuffers) { + final boolean debug = false; //调试开关 + if (debug) { + int remain = buffer.remaining(); + if (exbuffers != null) { + for (ByteBuffer b : exbuffers) { + remain += b == null ? 0 : b.remaining(); + } + } + logger.log(Level.FINEST, "read websocket message's length = " + remain); + } + if (buffer.remaining() < 2) return -1; + final byte opcode = buffer.get(); + final WebSocketPacket packet = new WebSocketPacket(); + packet.last = (opcode & 0b1000_0000) != 0; + packet.type = FrameType.valueOf(opcode & 0xF); + if (packet.type == FrameType.CLOSE) { + if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); + packets.add(packet); + return 0; + } + final boolean checkrsv = false;//暂时不校验 + if (checkrsv && (opcode & 0b0111_0000) != 0) { + if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")"); + return -1; //rsv1 rsv2 rsv3 must be 0 + } + //0x00 表示一个后续帧 + //0x01 表示一个文本帧 + //0x02 表示一个二进制帧 + //0x03-07 为以后的非控制帧保留 + //0x8 表示一个连接关闭 + //0x9 表示一个ping + //0xA 表示一个pong + //0x0B-0F 为以后的控制帧保留 + final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧 + byte lengthCode = buffer.get(); + final boolean masked = (lengthCode & 0x80) == 0x80; + if (masked) lengthCode ^= 0x80; //mask + int length; + if (lengthCode <= 0x7D) { //125 + length = lengthCode; + } else { + if (control) { + if (debug) logger.log(Level.FINE, " receive control command from websocket client"); + return -1; + } + if (lengthCode == 0x7E) {//0x7E=126 + length = (int) buffer.getChar(); + } else { + length = buffer.getInt(); + } + } + if (length > wsmaxbody && wsmaxbody > 0) { + if (debug) logger.log(Level.FINE, "message body(" + length + ") is too big, but must less " + wsmaxbody); + return -1; + } + ConvertMask masker = null; + if (masked) { + final byte[] masks = new byte[4]; + buffer.get(masks); + masker = new ConvertMask() { + + private int index = 0; + + @Override + public byte unmask(byte value) { + return (byte) (value ^ masks[index++ % 4]); + } + }; + } + this.receiveBuffers = Utility.append(new ByteBuffer[]{buffer}, exbuffers); + return this; + } +*/ /** * 消息解码
* diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 40fb1807d..bbdf35111 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -62,12 +62,15 @@ class WebSocketRunner implements Runnable { public void run() { final boolean debug = true; try { + final int wsmaxbody = webSocket._engine.wsmaxbody; webSocket.onConnected(); channel.setReadTimeoutSecond(300); //读取超时5分钟 if (channel.isOpen()) { channel.read(readBuffer, null, new CompletionHandler() { private ByteBuffer recentExBuffer; + + private final List packets = new ArrayList<>(); //当接收的数据流长度大于ByteBuffer长度时, 则需要额外的ByteBuffer 辅助; private final List readBuffers = new ArrayList<>(); diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index c887d1042..e4102a220 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -48,6 +48,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Comment("WebScoket服务器最大连接数,为0表示无限制") public static final String WEBPARAM__WSMAXCONNS = "wsmaxconns"; + @Comment("最大消息体长度, 小于1表示无限制") + public static final String WEBPARAM__WSMAXBODY = "wsmaxbody"; + @Comment("WebScoket服务器给客户端进行ping操作的默认间隔时间, 单位: 秒") public static final int DEFAILT_LIVEINTERVAL = 15; @@ -65,6 +68,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl protected int wsmaxconns = 0; + protected int wsmaxbody = 0; + @Resource(name = "jsonconvert") protected Convert jsonConvert; @@ -112,7 +117,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service - this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, wsmaxconns, this.node, this.sendConvert, logger); + this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", + this.single, context, liveinterval, wsmaxconns, wsmaxbody, this.node, this.sendConvert, logger); this.node.init(conf); this.node.localEngine.init(conf); }