This commit is contained in:
@@ -320,6 +320,10 @@ public final class Rest {
|
||||
pushInt(mv, rws.wsmaxconns());
|
||||
mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxconns", "I");
|
||||
|
||||
mv.visitVarInsn(ALOAD, 0);
|
||||
pushInt(mv, rws.wsmaxbody());
|
||||
mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxbody", "I");
|
||||
|
||||
mv.visitVarInsn(ALOAD, 0);
|
||||
mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0);
|
||||
mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z");
|
||||
|
||||
@@ -66,6 +66,13 @@ public @interface RestWebSocket {
|
||||
*/
|
||||
int wsmaxconns() default 0;
|
||||
|
||||
/**
|
||||
* 最大消息体长度, 小于1表示无限制
|
||||
*
|
||||
* @return 最大消息体长度
|
||||
*/
|
||||
int wsmaxbody() default 16 * 1024;
|
||||
|
||||
/**
|
||||
* 是否屏蔽该类的转换
|
||||
*
|
||||
|
||||
@@ -55,7 +55,7 @@ public class WebSocketEngine {
|
||||
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
|
||||
|
||||
@Comment("当前连接数")
|
||||
private final AtomicInteger currconns = new AtomicInteger();
|
||||
protected final AtomicInteger currconns = new AtomicInteger();
|
||||
|
||||
@Comment("用于PING的定时器")
|
||||
private ScheduledThreadPoolExecutor scheduler;
|
||||
@@ -64,12 +64,16 @@ public class WebSocketEngine {
|
||||
protected final Logger logger;
|
||||
|
||||
@Comment("PING的间隔秒数")
|
||||
private int liveinterval;
|
||||
protected int liveinterval;
|
||||
|
||||
@Comment("最大连接数, 为0表示无限制")
|
||||
private int wsmaxconns;
|
||||
protected int wsmaxconns;
|
||||
|
||||
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int wsmaxconns, WebSocketNode node, Convert sendConvert, Logger logger) {
|
||||
@Comment("最大消息体长度, 小于1表示无限制")
|
||||
protected int wsmaxbody;
|
||||
|
||||
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval,
|
||||
int wsmaxconns, int wsmaxbody, WebSocketNode node, Convert sendConvert, Logger logger) {
|
||||
this.engineid = engineid;
|
||||
this.single = single;
|
||||
this.context = context;
|
||||
@@ -77,6 +81,7 @@ public class WebSocketEngine {
|
||||
this.node = node;
|
||||
this.liveinterval = liveinterval;
|
||||
this.wsmaxconns = wsmaxconns;
|
||||
this.wsmaxbody = wsmaxbody;
|
||||
this.logger = logger;
|
||||
this.index = sequence.getAndIncrement();
|
||||
}
|
||||
@@ -86,7 +91,8 @@ public class WebSocketEngine {
|
||||
if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties");
|
||||
this.liveinterval = props == null ? (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval));
|
||||
if (liveinterval <= 0) return;
|
||||
this.wsmaxconns = props == null ? this.wsmaxconns : props.getIntValue(WEBPARAM__WSMAXCONNS, this.wsmaxconns);
|
||||
if (props != null) this.wsmaxconns = props.getIntValue(WEBPARAM__WSMAXCONNS, this.wsmaxconns);
|
||||
if (props != null) this.wsmaxbody = props.getIntValue(WEBPARAM__WSMAXBODY, this.wsmaxbody);
|
||||
if (scheduler != null) return;
|
||||
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
|
||||
final Thread t = new Thread(r, engineid + "-WebSocket-LiveInterval-Thread");
|
||||
|
||||
@@ -290,6 +290,109 @@ public final class WebSocketPacket {
|
||||
// String rs = JsonConvert.root().convertFrom(String.class, masker, buffer);
|
||||
// System.out.println(rs);
|
||||
// }
|
||||
/**
|
||||
* 消息解码 <br>
|
||||
*
|
||||
* 0 1 2 3
|
||||
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
* +-+-+-+-+-------+-+-------------+-------------------------------+
|
||||
* |F|R|R|R| opcode|M| Payload len | Extended payload length |
|
||||
* |I|S|S|S| (4) |A| (7) | (16/64) |
|
||||
* |N|V|V|V| |S| | (if payload len==126/127) |
|
||||
* | |1|2|3| |K| | |
|
||||
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|
||||
* | Extended payload length continued, if payload len == 127 |
|
||||
* + - - - - - - - - - - - - - - - +-------------------------------+
|
||||
* | |Masking-key, if MASK set to 1 |
|
||||
* +-------------------------------+-------------------------------+
|
||||
* | Masking-key (continued) | Payload Data |
|
||||
* +-------------------------------- - - - - - - - - - - - - - - - +
|
||||
* : Payload Data continued :
|
||||
* + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|
||||
* | Payload Data continued |
|
||||
* +-----------------------------------------------------------------------+
|
||||
*
|
||||
* @param buffer
|
||||
* @param exbuffers
|
||||
*
|
||||
* @return 1:表示解析成功且能继续解析;0:表示解析成功; -1:表示解析失败
|
||||
*/
|
||||
/*
|
||||
static int decode(final Logger logger, final List<WebSocketPacket> packets,
|
||||
final int wsmaxbody, final ByteBuffer buffer, ByteBuffer... exbuffers) {
|
||||
final boolean debug = false; //调试开关
|
||||
if (debug) {
|
||||
int remain = buffer.remaining();
|
||||
if (exbuffers != null) {
|
||||
for (ByteBuffer b : exbuffers) {
|
||||
remain += b == null ? 0 : b.remaining();
|
||||
}
|
||||
}
|
||||
logger.log(Level.FINEST, "read websocket message's length = " + remain);
|
||||
}
|
||||
if (buffer.remaining() < 2) return -1;
|
||||
final byte opcode = buffer.get();
|
||||
final WebSocketPacket packet = new WebSocketPacket();
|
||||
packet.last = (opcode & 0b1000_0000) != 0;
|
||||
packet.type = FrameType.valueOf(opcode & 0xF);
|
||||
if (packet.type == FrameType.CLOSE) {
|
||||
if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
|
||||
packets.add(packet);
|
||||
return 0;
|
||||
}
|
||||
final boolean checkrsv = false;//暂时不校验
|
||||
if (checkrsv && (opcode & 0b0111_0000) != 0) {
|
||||
if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")");
|
||||
return -1; //rsv1 rsv2 rsv3 must be 0
|
||||
}
|
||||
//0x00 表示一个后续帧
|
||||
//0x01 表示一个文本帧
|
||||
//0x02 表示一个二进制帧
|
||||
//0x03-07 为以后的非控制帧保留
|
||||
//0x8 表示一个连接关闭
|
||||
//0x9 表示一个ping
|
||||
//0xA 表示一个pong
|
||||
//0x0B-0F 为以后的控制帧保留
|
||||
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
|
||||
byte lengthCode = buffer.get();
|
||||
final boolean masked = (lengthCode & 0x80) == 0x80;
|
||||
if (masked) lengthCode ^= 0x80; //mask
|
||||
int length;
|
||||
if (lengthCode <= 0x7D) { //125
|
||||
length = lengthCode;
|
||||
} else {
|
||||
if (control) {
|
||||
if (debug) logger.log(Level.FINE, " receive control command from websocket client");
|
||||
return -1;
|
||||
}
|
||||
if (lengthCode == 0x7E) {//0x7E=126
|
||||
length = (int) buffer.getChar();
|
||||
} else {
|
||||
length = buffer.getInt();
|
||||
}
|
||||
}
|
||||
if (length > wsmaxbody && wsmaxbody > 0) {
|
||||
if (debug) logger.log(Level.FINE, "message body(" + length + ") is too big, but must less " + wsmaxbody);
|
||||
return -1;
|
||||
}
|
||||
ConvertMask masker = null;
|
||||
if (masked) {
|
||||
final byte[] masks = new byte[4];
|
||||
buffer.get(masks);
|
||||
masker = new ConvertMask() {
|
||||
|
||||
private int index = 0;
|
||||
|
||||
@Override
|
||||
public byte unmask(byte value) {
|
||||
return (byte) (value ^ masks[index++ % 4]);
|
||||
}
|
||||
};
|
||||
}
|
||||
this.receiveBuffers = Utility.append(new ByteBuffer[]{buffer}, exbuffers);
|
||||
return this;
|
||||
}
|
||||
*/
|
||||
/**
|
||||
* 消息解码 <br>
|
||||
*
|
||||
|
||||
@@ -62,12 +62,15 @@ class WebSocketRunner implements Runnable {
|
||||
public void run() {
|
||||
final boolean debug = true;
|
||||
try {
|
||||
final int wsmaxbody = webSocket._engine.wsmaxbody;
|
||||
webSocket.onConnected();
|
||||
channel.setReadTimeoutSecond(300); //读取超时5分钟
|
||||
if (channel.isOpen()) {
|
||||
channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
|
||||
|
||||
private ByteBuffer recentExBuffer;
|
||||
|
||||
private final List<WebSocketPacket> packets = new ArrayList<>();
|
||||
|
||||
//当接收的数据流长度大于ByteBuffer长度时, 则需要额外的ByteBuffer 辅助;
|
||||
private final List<ByteBuffer> readBuffers = new ArrayList<>();
|
||||
|
||||
@@ -48,6 +48,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
@Comment("WebScoket服务器最大连接数,为0表示无限制")
|
||||
public static final String WEBPARAM__WSMAXCONNS = "wsmaxconns";
|
||||
|
||||
@Comment("最大消息体长度, 小于1表示无限制")
|
||||
public static final String WEBPARAM__WSMAXBODY = "wsmaxbody";
|
||||
|
||||
@Comment("WebScoket服务器给客户端进行ping操作的默认间隔时间, 单位: 秒")
|
||||
public static final int DEFAILT_LIVEINTERVAL = 15;
|
||||
|
||||
@@ -65,6 +68,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
|
||||
protected int wsmaxconns = 0;
|
||||
|
||||
protected int wsmaxbody = 0;
|
||||
|
||||
@Resource(name = "jsonconvert")
|
||||
protected Convert jsonConvert;
|
||||
|
||||
@@ -112,7 +117,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
}
|
||||
|
||||
//存在WebSocketServlet,则此WebSocketNode必须是本地模式Service
|
||||
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, wsmaxconns, this.node, this.sendConvert, logger);
|
||||
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
|
||||
this.single, context, liveinterval, wsmaxconns, wsmaxbody, this.node, this.sendConvert, logger);
|
||||
this.node.init(conf);
|
||||
this.node.localEngine.init(conf);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user