From 14a72bf36a45af90adfd0cef74575106e3f60b68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9C=B0=E5=B9=B3=E7=BA=BF?= <22250530@qq.com> Date: Thu, 10 Sep 2015 13:49:49 +0800 Subject: [PATCH] --- .../wentch/redkale/net/http/WebSocket.java | 15 ++ .../redkale/net/http/WebSocketPacket.java | 26 +-- .../redkale/net/http/WebSocketRunner.java | 14 +- .../wentch/redkale/net/icep/IcepCoder.java | 20 ++ .../wentch/redkale/net/icep/IcepServer.java | 6 +- .../redkale/net/icep/rtcp/RtcpHeader.java | 133 ++++++++++++ .../redkale/net/icep/rtp/RtpHeader.java | 199 ++++++++++++++++++ .../redkale/net/icep/rtp/RtpPacket.java | 36 ++++ .../redkale/net/icep/stun/StunHeader.java | 3 +- 9 files changed, 432 insertions(+), 20 deletions(-) create mode 100644 src/com/wentch/redkale/net/icep/IcepCoder.java create mode 100644 src/com/wentch/redkale/net/icep/rtcp/RtcpHeader.java create mode 100644 src/com/wentch/redkale/net/icep/rtp/RtpHeader.java create mode 100644 src/com/wentch/redkale/net/icep/rtp/RtpPacket.java diff --git a/src/com/wentch/redkale/net/http/WebSocket.java b/src/com/wentch/redkale/net/http/WebSocket.java index f687edf09..2b4bd908f 100644 --- a/src/com/wentch/redkale/net/http/WebSocket.java +++ b/src/com/wentch/redkale/net/http/WebSocket.java @@ -6,6 +6,7 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.*; +import com.wentch.redkale.net.http.WebSocketPacket.FrameType; import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -121,6 +122,14 @@ public abstract class WebSocket { return send(data, true); } + public final int sendPing(byte[] data) { + return send(new WebSocketPacket(FrameType.PING, data)); + } + + public final int sendPong(byte[] data) { + return send(new WebSocketPacket(FrameType.PONG, data)); + } + /** * 发送二进制消息 *

@@ -352,6 +361,12 @@ public abstract class WebSocket { public void onMessage(String text) { } + public void onPing(byte[] bytes) { + } + + public void onPong(byte[] bytes) { + } + public void onMessage(byte[] bytes) { } diff --git a/src/com/wentch/redkale/net/http/WebSocketPacket.java b/src/com/wentch/redkale/net/http/WebSocketPacket.java index 7a7d2b942..43c648590 100644 --- a/src/com/wentch/redkale/net/http/WebSocketPacket.java +++ b/src/com/wentch/redkale/net/http/WebSocketPacket.java @@ -15,13 +15,13 @@ import java.util.*; */ public final class WebSocketPacket { - public static enum PacketType { + public static enum FrameType { TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A); private final int value; - private PacketType(int v) { + private FrameType(int v) { this.value = v; } @@ -29,7 +29,7 @@ public final class WebSocketPacket { return value; } - public static PacketType valueOf(int v) { + public static FrameType valueOf(int v) { switch (v) { case 0x01: return TEXT; case 0x02: return BINARY; @@ -41,7 +41,7 @@ public final class WebSocketPacket { } } - protected PacketType type; + protected FrameType type; protected String payload; @@ -59,36 +59,36 @@ public final class WebSocketPacket { public WebSocketPacket(Serializable message, boolean fin) { boolean bin = message != null && message.getClass() == byte[].class; if (bin) { - this.type = PacketType.BINARY; + this.type = FrameType.BINARY; this.bytes = (byte[]) message; } else { - this.type = PacketType.TEXT; + this.type = FrameType.TEXT; this.payload = String.valueOf(message); } this.last = fin; } public WebSocketPacket(String payload, boolean fin) { - this.type = PacketType.TEXT; + this.type = FrameType.TEXT; this.payload = payload; this.last = fin; } public WebSocketPacket(byte[] data) { - this(PacketType.BINARY, data, true); + this(FrameType.BINARY, data, true); } public WebSocketPacket(byte[] data, boolean fin) { - this(PacketType.BINARY, data, fin); + this(FrameType.BINARY, data, fin); } - public WebSocketPacket(PacketType type, byte[] data) { + public WebSocketPacket(FrameType type, byte[] data) { this(type, data, true); } - public WebSocketPacket(PacketType type, byte[] data, boolean fin) { + public WebSocketPacket(FrameType type, byte[] data, boolean fin) { this.type = type; - if (type == PacketType.TEXT) { + if (type == FrameType.TEXT) { this.payload = new String(Utility.decodeUTF8(data)); } else { this.bytes = data; @@ -97,7 +97,7 @@ public final class WebSocketPacket { } public byte[] getContent() { - if (this.type == PacketType.TEXT) return Utility.encodeUTF8(getPayload()); + if (this.type == FrameType.TEXT) return Utility.encodeUTF8(getPayload()); if (this.bytes == null) return new byte[0]; return this.bytes; } diff --git a/src/com/wentch/redkale/net/http/WebSocketRunner.java b/src/com/wentch/redkale/net/http/WebSocketRunner.java index 7f20119d4..6112c8050 100644 --- a/src/com/wentch/redkale/net/http/WebSocketRunner.java +++ b/src/com/wentch/redkale/net/http/WebSocketRunner.java @@ -8,7 +8,7 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.AsyncConnection; import com.wentch.redkale.net.Context; import static com.wentch.redkale.net.http.WebSocket.*; -import com.wentch.redkale.net.http.WebSocketPacket.PacketType; +import com.wentch.redkale.net.http.WebSocketPacket.FrameType; import java.nio.ByteBuffer; import java.nio.channels.*; import java.security.SecureRandom; @@ -120,10 +120,14 @@ public class WebSocketRunner implements Runnable { } webSocket.group.setRecentWebSocket(webSocket); try { - if (packet.type == PacketType.TEXT) { + if (packet.type == FrameType.TEXT) { webSocket.onMessage(packet.getPayload()); - } else if (packet.type == PacketType.BINARY) { + } else if (packet.type == FrameType.BINARY) { webSocket.onMessage(packet.getBytes()); + } else if (packet.type == FrameType.PONG) { + webSocket.onPong(packet.getBytes()); + } else if (packet.type == FrameType.PING) { + webSocket.onPing(packet.getBytes()); } } catch (Exception e) { context.getLogger().log(Level.INFO, "WebSocket onMessage error (" + packet + ")", e); @@ -414,8 +418,8 @@ public class WebSocketRunner implements Runnable { //0x0B-0F 为以后的控制帧保留 final boolean control = (opcode & 0x08) == 0x08; //是否控制帧 //final boolean continuation = opcode == 0; - PacketType type = PacketType.valueOf(opcode & 0xf); - if (type == PacketType.CLOSE) { + FrameType type = FrameType.valueOf(opcode & 0xf); + if (type == FrameType.CLOSE) { if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); return null; } diff --git a/src/com/wentch/redkale/net/icep/IcepCoder.java b/src/com/wentch/redkale/net/icep/IcepCoder.java new file mode 100644 index 000000000..6cea736f3 --- /dev/null +++ b/src/com/wentch/redkale/net/icep/IcepCoder.java @@ -0,0 +1,20 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.net.icep; + +import java.nio.*; + +/** + * + * @author zhangjx + * @param + */ +public interface IcepCoder { + + public T decode(final ByteBuffer buffer); + + public ByteBuffer encode(final ByteBuffer buffer); +} diff --git a/src/com/wentch/redkale/net/icep/IcepServer.java b/src/com/wentch/redkale/net/icep/IcepServer.java index 09e34e9ba..43d9e118b 100644 --- a/src/com/wentch/redkale/net/icep/IcepServer.java +++ b/src/com/wentch/redkale/net/icep/IcepServer.java @@ -22,7 +22,11 @@ public final class IcepServer extends Server { public IcepServer() { this(System.currentTimeMillis(), null); } - + /** + "content":"{\"cmd\":\"icecandidate\",\"candidate\":{\"candidate\":\"candidate:3791502225 1 tcp 1518214911 10.28.2.207 0 typ host tcptype active generation 0\",\"sdpMid\":\"video\",\"sdpMLineIndex\":1}}" + @param args + @throws Exception + */ public static void main(String[] args) throws Exception { DefaultAnyValue conf = new DefaultAnyValue(); conf.addValue("host", "10.28.2.207"); diff --git a/src/com/wentch/redkale/net/icep/rtcp/RtcpHeader.java b/src/com/wentch/redkale/net/icep/rtcp/RtcpHeader.java new file mode 100644 index 000000000..2ef136087 --- /dev/null +++ b/src/com/wentch/redkale/net/icep/rtcp/RtcpHeader.java @@ -0,0 +1,133 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.net.icep.rtcp; + +/** + 1byte = 8bits + * + * @author zhangjx + */ +public class RtcpHeader { + + public static final int RTCP_SR = 200; //发送者报告 描述作为活跃发送者成员的发送和接收统计数字 + + public static final int RTCP_RR = 201; //接收者报告 描述非活跃发送者成员的接收统计数字 + + public static final int RTCP_SDES = 202; //源描述项。 其中包括规范名CNAME + + public static final int RTCP_BYE = 203; //关闭 表明参与者将结束会话 + + public static final int RTCP_APP = 204; //应用描述功能 + + /** + * protocol version + * 占2比特。 表示RTP 的版本号。 + */ + protected int version; + + /** + * padding flag + * 占1比特。 置“1”表示用户数据最后加有填充位,用户数据中最后一个字节是填充位计数,它表示一共加了多少个填充位。在两种情况下可能 + * 需要填充,一是某些加密算法要求数据块大小固定;二是在一个低层协议数据包中装载多个RTP 分组。 + */ + protected boolean padding; // + + /** + * 占1比特。 置“1”表示RTP 报头后紧随一个扩展报头。 + */ + protected boolean extend; + + /** + * varies by packet type + */ + protected int count; + + /** + * RTCP packet type + */ + protected int packetType; //占1个字节 + + /** + * Packet length in words, w/o this word + */ + protected int length; + + protected RtcpHeader() { + this(false, 0); + } + + public RtcpHeader(boolean padding, int pt) { + this.padding = padding; + this.packetType = pt; + this.count = 0; + this.length = 0; + this.version = 2; + } + + protected int decode(byte[] rawData, int offSet) { + int b = rawData[offSet++] & 0xff; + + this.version = (b & 0xC0) >> 6; + this.padding = (b & 0x20) == 0x020; + + this.count = b & 0x1F; + + this.packetType = rawData[offSet++] & 0x000000FF; + + this.length |= rawData[offSet++] & 0xFF; + this.length <<= 8; + this.length |= rawData[offSet++] & 0xFF; + + /** + * The length of this RTCP packet in 32-bit words minus one, including + * the header and any padding. (The offset of one makes zero a valid + * length and avoids a possible infinite loop in scanning a compound + * RTCP packet, while counting 32-bit words avoids a validity check for + * a multiple of 4.) + */ + this.length = (this.length * 4) + 4; + + return offSet; + } + + protected int encode(byte[] rawData, int offSet) { + rawData[offSet] = (byte) (this.version << 6); + if (this.padding) { + rawData[offSet] = (byte) (rawData[offSet] | 0x20); + } + + rawData[offSet] = (byte) (rawData[offSet] | (this.count & 0x1F)); + + offSet++; + + rawData[offSet++] = (byte) (this.packetType & 0x000000FF); + + // Setting length is onus of concrete class. But we increment the offSet + offSet += 2; + + return offSet; + } + + public int getVersion() { + return version; + } + + public boolean isPadding() { + return padding; + } + + public int getCount() { + return count; + } + + public int getPacketType() { + return packetType; + } + + public int getLength() { + return length; + } +} diff --git a/src/com/wentch/redkale/net/icep/rtp/RtpHeader.java b/src/com/wentch/redkale/net/icep/rtp/RtpHeader.java new file mode 100644 index 000000000..081b27e9b --- /dev/null +++ b/src/com/wentch/redkale/net/icep/rtp/RtpHeader.java @@ -0,0 +1,199 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.net.icep.rtp; + +import com.wentch.redkale.net.icep.*; +import java.nio.*; + +/** + * The RTP header has the following format: + * + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * |V=2|P|X| CC |M| PT | sequence number | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | timestamp | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | synchronization source (SSRC) identifier | + * +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + * | contributing source (CSRC) identifiers | + * | | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * + * The first twelve octets are present in every RTP packet, while the + * list of CSRC identifiers is present only when inserted by a mixer. + * + * The version defined by RFC3550 specification is two. + * + * @author zhangjx + */ +public class RtpHeader implements IcepCoder { + + /** + * 该值占4个字节, 由7种数据组成,按位存储 + * version(2bits) + padding(1bit) + extend(1bit) + crscs(4bits) + marker(1bit) + payloadtype(7bits) + sn(16bits) + * version : 占2比特。 表示RTP 的版本号 0b10。 + * padding : 占1比特。 置“1”表示用户数据最后加有填充位,用户数据中最后一个字节是填充位计数,它表示一共加了多少个填充位。 + * 在两种情况下可能需要填充: 一是某些加密算法要求数据块大小固定;二是在一个低层协议数据包中装载多个RTP 分组。 + * extend : 占1比特。 置“1”表示RTP 报头后紧随一个扩展报头。 + * crscs : 占4比特。 CSRC计数包括紧接在固定头后CSRC标识符个数。 + * marker : 占1比特。 标记解释由设置定义,目的在于允许重要事件在包流中标记出来。设置可定义其他标示位,或通过改变位数量来指定没有标记位。 + * payloadtype : 占7比特。 载荷类型: 记录后面资料使用哪种 Codec , receiver 端找出相应的 decoder 解码出來。 + * sn : 占16比特。 每发送一个RTP数据包该序号增加1。该序号在接收方可用来发现丢失的数据包和对到来的数据包进行排序。 初值为随机数,每发送一个增加1。可供接收方检测分组丢失和恢复分组次序。 + * + */ + private int headval = 0b10_0_0_0000_0_0000000_0000000000000000; + + /** + * 占4个字节。 时间戳 表示RTP分组第一个字节的取样时刻。其初值为随机数,每个采用周期加1。如果每次传送20ms的数据,由于音频的采样频率为8000Hz,即每20ms有160次采样,则每传送20ms的数据,时戳增加160。 + */ + private int timestamp; + + /** + * 同步源标识符(SSRC) 相当于每个数据流的唯一ID + * 占4字节。 用来标识一个同步源。此标识符是随机选择的,但要保证同一RTP会话中的任意两个SSRC各不相同,RTP必须检测并解决冲突。 + */ + private int ssrc; + + /** + * 提供源标识符(CSRC) + * 占0-60个字节。 它可有0~15项标识符,每一项长度固定为32比特,其项数由CC字段(crscs)来确定。如果提供源多于15个,则只有15个被标识。 + */ + private int[] csrc; + + /** + * 占2字节 + */ + private short exttype; + + /** + * 扩展数据 + * 占4*N字节, 必须是4字节的倍数 + */ + private byte[] extdata; + + @Override + public RtpHeader decode(final ByteBuffer buffer) { + this.headval = buffer.getInt(); + this.timestamp = buffer.getInt(); + this.ssrc = buffer.getInt(); + final int csrcs = getCrscs(); + if (csrcs > 0) { + this.csrc = new int[csrcs]; + for (int i = 0; i < csrcs; i++) { + this.csrc[i] = buffer.getInt(); + } + } + if (isExtend()) { + this.exttype = buffer.getShort(); + int extdatalen = (buffer.getShort() & 0xffff) * 4; + this.extdata = new byte[extdatalen]; + buffer.get(extdata); + } + return this; + } + + @Override + public ByteBuffer encode(final ByteBuffer buffer) { + buffer.putInt(this.headval); + buffer.putInt(this.timestamp); + buffer.putInt(this.ssrc); + final int csrcs = getCrscs(); + if (csrcs > 0) { + for (int i = 0; i < csrcs; i++) { + buffer.putInt(this.csrc[i]); + } + } + if (isExtend()) { + buffer.putShort(this.exttype); + buffer.putShort((short) (this.extdata.length / 4)); + buffer.put(this.extdata); + } + return buffer; + } + + public int getVersion() { + return (headval << 30) & 0b11; + } + + public boolean isPadding() { + return (headval & 0b00_1_0_0000_0_0000000_0000000000000000) > 0; + } + + public void setPadding(boolean padding) { + if (padding) { + headval |= 0b00_1_0_0000_0_0000000_0000000000000000; + } else { + headval &= 0b11_0_1_1111_1_1111111_1111111111111111; + } + } + + public boolean isExtend() { + return (headval & 0b00_0_1_0000_0_0000000_0000000000000000) > 0; + } + + public short getExttype() { + return exttype; + } + + public void setExttype(short exttype) { + this.exttype = exttype; + } + + public int[] getCsrc() { + return csrc; + } + + public void setCsrc(int[] csrc) { + this.csrc = csrc != null && csrc.length > 0 ? csrc : null; + if (this.csrc != null) { + this.headval = (headval & 0b11_1_1_0000_1_1111111_1111111111111111) | ((csrc.length << 24) & 0b00_0_0_1111_0_0000000_0000000000000000); + } else { + this.headval &= 0b11_1_1_0000_1_1111111_1111111111111111; + } + } + + public byte[] getExtdata() { + return extdata; + } + + public void setExtdata(byte[] exdata) { + boolean extend = exdata != null || exdata.length > 0; + if (extend) { + if ((exdata.length & 0b100) > 0) throw new RuntimeException("extdata length(" + exdata.length + ") is illegal"); + headval |= 0b00_0_1_0000_0_0000000_0000000000000000; + } else { + headval &= 0b11_1_0_1111_1_1111111_1111111111111111; + } + this.extdata = (exdata != null && exdata.length > 0) ? exdata : null; + } + + public int getCrscs() { + return (headval >> 4 << 28) & 0b1111; + } + + public boolean isMarker() { + return (headval & 0b00_0_0_0000_1_0000000_0000000000000000) > 0; + } + + public int getPayloadtype() { + return (headval >> 9 << 25) & 0b1111111; + } + + public void setPayloadtype(int payloadtype) { + headval = (headval & 0b11_1_1_1111_1_0000000_1111111111111111) | ((payloadtype << 16) & 0b00_0_0_0000_0_1111111_0000000000000000); + } + + public int getSeqnumber() { + return headval & 0xFFFF; + } + + public void setSeqnumber(int seqnumber) { + headval = (headval >> 16 << 16) | (seqnumber & 0x0000FFFF); + } + +} diff --git a/src/com/wentch/redkale/net/icep/rtp/RtpPacket.java b/src/com/wentch/redkale/net/icep/rtp/RtpPacket.java new file mode 100644 index 000000000..1e6601206 --- /dev/null +++ b/src/com/wentch/redkale/net/icep/rtp/RtpPacket.java @@ -0,0 +1,36 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.net.icep.rtp; + +import com.wentch.redkale.net.icep.*; +import java.nio.*; + +/** + * + * @author zhangjx + */ +public class RtpPacket implements IcepCoder { + + private RtpHeader header; + + private byte[] payload; + + @Override + public RtpPacket decode(final ByteBuffer buffer) { + if (header == null) this.header = new RtpHeader(); + this.header.decode(buffer); + this.payload = new byte[buffer.remaining()]; + buffer.get(payload); + return this; + } + + @Override + public ByteBuffer encode(final ByteBuffer buffer) { + this.header.encode(buffer).put(payload); + return buffer; + } + +} diff --git a/src/com/wentch/redkale/net/icep/stun/StunHeader.java b/src/com/wentch/redkale/net/icep/stun/StunHeader.java index 419fc8977..1c62a9835 100644 --- a/src/com/wentch/redkale/net/icep/stun/StunHeader.java +++ b/src/com/wentch/redkale/net/icep/stun/StunHeader.java @@ -5,6 +5,7 @@ */ package com.wentch.redkale.net.icep.stun; +import com.wentch.redkale.net.icep.*; import com.wentch.redkale.util.*; import java.nio.*; import java.security.*; @@ -13,7 +14,7 @@ import java.security.*; * * @author zhangjx */ -public class StunHeader { +public class StunHeader implements IcepCoder{ public static final int MAGIC_COOKIE = 0x2112A442;