This commit is contained in:
地平线
2015-09-10 13:49:49 +08:00
parent 05c22c60a8
commit 14a72bf36a
9 changed files with 432 additions and 20 deletions

View File

@@ -6,6 +6,7 @@
package com.wentch.redkale.net.http;
import com.wentch.redkale.net.*;
import com.wentch.redkale.net.http.WebSocketPacket.FrameType;
import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -121,6 +122,14 @@ public abstract class WebSocket {
return send(data, true);
}
public final int sendPing(byte[] data) {
return send(new WebSocketPacket(FrameType.PING, data));
}
public final int sendPong(byte[] data) {
return send(new WebSocketPacket(FrameType.PONG, data));
}
/**
* 发送二进制消息
* <p>
@@ -352,6 +361,12 @@ public abstract class WebSocket {
public void onMessage(String text) {
}
public void onPing(byte[] bytes) {
}
public void onPong(byte[] bytes) {
}
public void onMessage(byte[] bytes) {
}

View File

@@ -15,13 +15,13 @@ import java.util.*;
*/
public final class WebSocketPacket {
public static enum PacketType {
public static enum FrameType {
TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A);
private final int value;
private PacketType(int v) {
private FrameType(int v) {
this.value = v;
}
@@ -29,7 +29,7 @@ public final class WebSocketPacket {
return value;
}
public static PacketType valueOf(int v) {
public static FrameType valueOf(int v) {
switch (v) {
case 0x01: return TEXT;
case 0x02: return BINARY;
@@ -41,7 +41,7 @@ public final class WebSocketPacket {
}
}
protected PacketType type;
protected FrameType type;
protected String payload;
@@ -59,36 +59,36 @@ public final class WebSocketPacket {
public WebSocketPacket(Serializable message, boolean fin) {
boolean bin = message != null && message.getClass() == byte[].class;
if (bin) {
this.type = PacketType.BINARY;
this.type = FrameType.BINARY;
this.bytes = (byte[]) message;
} else {
this.type = PacketType.TEXT;
this.type = FrameType.TEXT;
this.payload = String.valueOf(message);
}
this.last = fin;
}
public WebSocketPacket(String payload, boolean fin) {
this.type = PacketType.TEXT;
this.type = FrameType.TEXT;
this.payload = payload;
this.last = fin;
}
public WebSocketPacket(byte[] data) {
this(PacketType.BINARY, data, true);
this(FrameType.BINARY, data, true);
}
public WebSocketPacket(byte[] data, boolean fin) {
this(PacketType.BINARY, data, fin);
this(FrameType.BINARY, data, fin);
}
public WebSocketPacket(PacketType type, byte[] data) {
public WebSocketPacket(FrameType type, byte[] data) {
this(type, data, true);
}
public WebSocketPacket(PacketType type, byte[] data, boolean fin) {
public WebSocketPacket(FrameType type, byte[] data, boolean fin) {
this.type = type;
if (type == PacketType.TEXT) {
if (type == FrameType.TEXT) {
this.payload = new String(Utility.decodeUTF8(data));
} else {
this.bytes = data;
@@ -97,7 +97,7 @@ public final class WebSocketPacket {
}
public byte[] getContent() {
if (this.type == PacketType.TEXT) return Utility.encodeUTF8(getPayload());
if (this.type == FrameType.TEXT) return Utility.encodeUTF8(getPayload());
if (this.bytes == null) return new byte[0];
return this.bytes;
}

View File

@@ -8,7 +8,7 @@ package com.wentch.redkale.net.http;
import com.wentch.redkale.net.AsyncConnection;
import com.wentch.redkale.net.Context;
import static com.wentch.redkale.net.http.WebSocket.*;
import com.wentch.redkale.net.http.WebSocketPacket.PacketType;
import com.wentch.redkale.net.http.WebSocketPacket.FrameType;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.security.SecureRandom;
@@ -120,10 +120,14 @@ public class WebSocketRunner implements Runnable {
}
webSocket.group.setRecentWebSocket(webSocket);
try {
if (packet.type == PacketType.TEXT) {
if (packet.type == FrameType.TEXT) {
webSocket.onMessage(packet.getPayload());
} else if (packet.type == PacketType.BINARY) {
} else if (packet.type == FrameType.BINARY) {
webSocket.onMessage(packet.getBytes());
} else if (packet.type == FrameType.PONG) {
webSocket.onPong(packet.getBytes());
} else if (packet.type == FrameType.PING) {
webSocket.onPing(packet.getBytes());
}
} catch (Exception e) {
context.getLogger().log(Level.INFO, "WebSocket onMessage error (" + packet + ")", e);
@@ -414,8 +418,8 @@ public class WebSocketRunner implements Runnable {
//0x0B-0F 为以后的控制帧保留
final boolean control = (opcode & 0x08) == 0x08; //是否控制帧
//final boolean continuation = opcode == 0;
PacketType type = PacketType.valueOf(opcode & 0xf);
if (type == PacketType.CLOSE) {
FrameType type = FrameType.valueOf(opcode & 0xf);
if (type == FrameType.CLOSE) {
if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
return null;
}

View File

@@ -0,0 +1,20 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net.icep;
import java.nio.*;
/**
*
* @author zhangjx
* @param <T>
*/
public interface IcepCoder<T> {
public T decode(final ByteBuffer buffer);
public ByteBuffer encode(final ByteBuffer buffer);
}

View File

@@ -22,7 +22,11 @@ public final class IcepServer extends Server {
public IcepServer() {
this(System.currentTimeMillis(), null);
}
/**
"content":"{\"cmd\":\"icecandidate\",\"candidate\":{\"candidate\":\"candidate:3791502225 1 tcp 1518214911 10.28.2.207 0 typ host tcptype active generation 0\",\"sdpMid\":\"video\",\"sdpMLineIndex\":1}}"
@param args
@throws Exception
*/
public static void main(String[] args) throws Exception {
DefaultAnyValue conf = new DefaultAnyValue();
conf.addValue("host", "10.28.2.207");

View File

@@ -0,0 +1,133 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net.icep.rtcp;
/**
1byte = 8bits
*
* @author zhangjx
*/
public class RtcpHeader {
public static final int RTCP_SR = 200; //发送者报告 描述作为活跃发送者成员的发送和接收统计数字
public static final int RTCP_RR = 201; //接收者报告 描述非活跃发送者成员的接收统计数字
public static final int RTCP_SDES = 202; //源描述项。 其中包括规范名CNAME
public static final int RTCP_BYE = 203; //关闭 表明参与者将结束会话
public static final int RTCP_APP = 204; //应用描述功能
/**
* protocol version
* 占2比特。 表示RTP 的版本号。
*/
protected int version;
/**
* padding flag
* 占1比特。 置“1”表示用户数据最后加有填充位用户数据中最后一个字节是填充位计数它表示一共加了多少个填充位。在两种情况下可能
* 需要填充一是某些加密算法要求数据块大小固定二是在一个低层协议数据包中装载多个RTP 分组。
*/
protected boolean padding; //
/**
* 占1比特。 置“1”表示RTP 报头后紧随一个扩展报头。
*/
protected boolean extend;
/**
* varies by packet type
*/
protected int count;
/**
* RTCP packet type
*/
protected int packetType; //占1个字节
/**
* Packet length in words, w/o this word
*/
protected int length;
protected RtcpHeader() {
this(false, 0);
}
public RtcpHeader(boolean padding, int pt) {
this.padding = padding;
this.packetType = pt;
this.count = 0;
this.length = 0;
this.version = 2;
}
protected int decode(byte[] rawData, int offSet) {
int b = rawData[offSet++] & 0xff;
this.version = (b & 0xC0) >> 6;
this.padding = (b & 0x20) == 0x020;
this.count = b & 0x1F;
this.packetType = rawData[offSet++] & 0x000000FF;
this.length |= rawData[offSet++] & 0xFF;
this.length <<= 8;
this.length |= rawData[offSet++] & 0xFF;
/**
* The length of this RTCP packet in 32-bit words minus one, including
* the header and any padding. (The offset of one makes zero a valid
* length and avoids a possible infinite loop in scanning a compound
* RTCP packet, while counting 32-bit words avoids a validity check for
* a multiple of 4.)
*/
this.length = (this.length * 4) + 4;
return offSet;
}
protected int encode(byte[] rawData, int offSet) {
rawData[offSet] = (byte) (this.version << 6);
if (this.padding) {
rawData[offSet] = (byte) (rawData[offSet] | 0x20);
}
rawData[offSet] = (byte) (rawData[offSet] | (this.count & 0x1F));
offSet++;
rawData[offSet++] = (byte) (this.packetType & 0x000000FF);
// Setting length is onus of concrete class. But we increment the offSet
offSet += 2;
return offSet;
}
public int getVersion() {
return version;
}
public boolean isPadding() {
return padding;
}
public int getCount() {
return count;
}
public int getPacketType() {
return packetType;
}
public int getLength() {
return length;
}
}

View File

@@ -0,0 +1,199 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net.icep.rtp;
import com.wentch.redkale.net.icep.*;
import java.nio.*;
/**
* The RTP header has the following format:
*
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |V=2|P|X| CC |M| PT | sequence number |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | timestamp |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | synchronization source (SSRC) identifier |
* +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
* | contributing source (CSRC) identifiers |
* | |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*
* The first twelve octets are present in every RTP packet, while the
* list of CSRC identifiers is present only when inserted by a mixer.
*
* The version defined by RFC3550 specification is two.
*
* @author zhangjx
*/
public class RtpHeader implements IcepCoder<RtpHeader> {
/**
* 该值占4个字节 由7种数据组成按位存储
* version(2bits) + padding(1bit) + extend(1bit) + crscs(4bits) + marker(1bit) + payloadtype(7bits) + sn(16bits)
* version : 占2比特。 表示RTP 的版本号 0b10。
* padding : 占1比特。 置“1”表示用户数据最后加有填充位用户数据中最后一个字节是填充位计数它表示一共加了多少个填充位。
* 在两种情况下可能需要填充: 一是某些加密算法要求数据块大小固定二是在一个低层协议数据包中装载多个RTP 分组。
* extend : 占1比特。 置“1”表示RTP 报头后紧随一个扩展报头。
* crscs : 占4比特。 CSRC计数包括紧接在固定头后CSRC标识符个数。
* marker : 占1比特。 标记解释由设置定义,目的在于允许重要事件在包流中标记出来。设置可定义其他标示位,或通过改变位数量来指定没有标记位。
* payloadtype : 占7比特。 载荷类型: 记录后面资料使用哪种 Codec receiver 端找出相应的 decoder 解码出來。
* sn : 占16比特。 每发送一个RTP数据包该序号增加1。该序号在接收方可用来发现丢失的数据包和对到来的数据包进行排序。 初值为随机数每发送一个增加1。可供接收方检测分组丢失和恢复分组次序。
*
*/
private int headval = 0b10_0_0_0000_0_0000000_0000000000000000;
/**
* 占4个字节。 时间戳 表示RTP分组第一个字节的取样时刻。其初值为随机数每个采用周期加1。如果每次传送20ms的数据由于音频的采样频率为8000Hz即每20ms有160次采样则每传送20ms的数据时戳增加160。
*/
private int timestamp;
/**
* 同步源标识符SSRC 相当于每个数据流的唯一ID
* 占4字节。 用来标识一个同步源。此标识符是随机选择的但要保证同一RTP会话中的任意两个SSRC各不相同RTP必须检测并解决冲突。
*/
private int ssrc;
/**
* 提供源标识符CSRC
* 占0-60个字节。 它可有0~15项标识符每一项长度固定为32比特其项数由CC字段crscs来确定。如果提供源多于15个则只有15个被标识。
*/
private int[] csrc;
/**
* 占2字节
*/
private short exttype;
/**
* 扩展数据
* 占4*N字节 必须是4字节的倍数
*/
private byte[] extdata;
@Override
public RtpHeader decode(final ByteBuffer buffer) {
this.headval = buffer.getInt();
this.timestamp = buffer.getInt();
this.ssrc = buffer.getInt();
final int csrcs = getCrscs();
if (csrcs > 0) {
this.csrc = new int[csrcs];
for (int i = 0; i < csrcs; i++) {
this.csrc[i] = buffer.getInt();
}
}
if (isExtend()) {
this.exttype = buffer.getShort();
int extdatalen = (buffer.getShort() & 0xffff) * 4;
this.extdata = new byte[extdatalen];
buffer.get(extdata);
}
return this;
}
@Override
public ByteBuffer encode(final ByteBuffer buffer) {
buffer.putInt(this.headval);
buffer.putInt(this.timestamp);
buffer.putInt(this.ssrc);
final int csrcs = getCrscs();
if (csrcs > 0) {
for (int i = 0; i < csrcs; i++) {
buffer.putInt(this.csrc[i]);
}
}
if (isExtend()) {
buffer.putShort(this.exttype);
buffer.putShort((short) (this.extdata.length / 4));
buffer.put(this.extdata);
}
return buffer;
}
public int getVersion() {
return (headval << 30) & 0b11;
}
public boolean isPadding() {
return (headval & 0b00_1_0_0000_0_0000000_0000000000000000) > 0;
}
public void setPadding(boolean padding) {
if (padding) {
headval |= 0b00_1_0_0000_0_0000000_0000000000000000;
} else {
headval &= 0b11_0_1_1111_1_1111111_1111111111111111;
}
}
public boolean isExtend() {
return (headval & 0b00_0_1_0000_0_0000000_0000000000000000) > 0;
}
public short getExttype() {
return exttype;
}
public void setExttype(short exttype) {
this.exttype = exttype;
}
public int[] getCsrc() {
return csrc;
}
public void setCsrc(int[] csrc) {
this.csrc = csrc != null && csrc.length > 0 ? csrc : null;
if (this.csrc != null) {
this.headval = (headval & 0b11_1_1_0000_1_1111111_1111111111111111) | ((csrc.length << 24) & 0b00_0_0_1111_0_0000000_0000000000000000);
} else {
this.headval &= 0b11_1_1_0000_1_1111111_1111111111111111;
}
}
public byte[] getExtdata() {
return extdata;
}
public void setExtdata(byte[] exdata) {
boolean extend = exdata != null || exdata.length > 0;
if (extend) {
if ((exdata.length & 0b100) > 0) throw new RuntimeException("extdata length(" + exdata.length + ") is illegal");
headval |= 0b00_0_1_0000_0_0000000_0000000000000000;
} else {
headval &= 0b11_1_0_1111_1_1111111_1111111111111111;
}
this.extdata = (exdata != null && exdata.length > 0) ? exdata : null;
}
public int getCrscs() {
return (headval >> 4 << 28) & 0b1111;
}
public boolean isMarker() {
return (headval & 0b00_0_0_0000_1_0000000_0000000000000000) > 0;
}
public int getPayloadtype() {
return (headval >> 9 << 25) & 0b1111111;
}
public void setPayloadtype(int payloadtype) {
headval = (headval & 0b11_1_1_1111_1_0000000_1111111111111111) | ((payloadtype << 16) & 0b00_0_0_0000_0_1111111_0000000000000000);
}
public int getSeqnumber() {
return headval & 0xFFFF;
}
public void setSeqnumber(int seqnumber) {
headval = (headval >> 16 << 16) | (seqnumber & 0x0000FFFF);
}
}

View File

@@ -0,0 +1,36 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net.icep.rtp;
import com.wentch.redkale.net.icep.*;
import java.nio.*;
/**
*
* @author zhangjx
*/
public class RtpPacket implements IcepCoder<RtpPacket> {
private RtpHeader header;
private byte[] payload;
@Override
public RtpPacket decode(final ByteBuffer buffer) {
if (header == null) this.header = new RtpHeader();
this.header.decode(buffer);
this.payload = new byte[buffer.remaining()];
buffer.get(payload);
return this;
}
@Override
public ByteBuffer encode(final ByteBuffer buffer) {
this.header.encode(buffer).put(payload);
return buffer;
}
}

View File

@@ -5,6 +5,7 @@
*/
package com.wentch.redkale.net.icep.stun;
import com.wentch.redkale.net.icep.*;
import com.wentch.redkale.util.*;
import java.nio.*;
import java.security.*;
@@ -13,7 +14,7 @@ import java.security.*;
*
* @author zhangjx
*/
public class StunHeader {
public class StunHeader implements IcepCoder<StunHeader>{
public static final int MAGIC_COOKIE = 0x2112A442;