This commit is contained in:
nareix 2016-05-19 19:40:50 +08:00
parent be1994a49b
commit ab6a068d45
4 changed files with 545 additions and 111 deletions

510
client.go
View File

@ -3,22 +3,26 @@ package rtsp
import (
"fmt"
"net"
"bytes"
"io"
"io/ioutil"
"strings"
"strconv"
"bufio"
"net/textproto"
"net/url"
"encoding/hex"
"encoding/binary"
"crypto/md5"
"github.com/nareix/av"
"github.com/nareix/codec/h264parser"
"github.com/nareix/rtsp/sdp"
)
type Client struct {
DebugConn bool
url *url.URL
conn net.Conn
rconn io.Reader
requestUri string
cseq uint
streams []*Stream
@ -27,6 +31,23 @@ type Client struct {
body io.Reader
}
type Request struct {
Header []string
Uri string
Method string
}
type Response struct {
BlockLength int
Block []byte
BlockNo int
StatusCode int
Header textproto.MIMEHeader
ContentLength int
Body []byte
}
func Connect(uri string) (self *Client, err error) {
var URL *url.URL
if URL, err = url.Parse(uri); err != nil {
@ -44,6 +65,7 @@ func Connect(uri string) (self *Client, err error) {
self = &Client{
conn: conn,
rconn: conn,
url: URL,
requestUri: u2.String(),
}
@ -52,20 +74,20 @@ func Connect(uri string) (self *Client, err error) {
func (self *Client) writeLine(line string) (err error) {
if self.DebugConn {
fmt.Print(line)
fmt.Print("> ", line)
}
_, err = fmt.Fprint(self.conn, line)
return
}
func (self *Client) WriteRequest(method string, uri string, headers []string) (err error) {
func (self *Client) WriteRequest(req Request) (err error) {
self.cseq++
headers = append(headers, fmt.Sprintf("CSeq: %d", self.cseq))
if err = self.writeLine(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, uri)); err != nil {
req.Header = append(req.Header, fmt.Sprintf("CSeq: %d", self.cseq))
if err = self.writeLine(fmt.Sprintf("%s %s RTSP/1.0\r\n", req.Method, req.Uri)); err != nil {
return
}
for _, header := range headers {
if err = self.writeLine(header+"\r\n"); err != nil {
for _, v := range req.Header {
if err = self.writeLine(fmt.Sprint(v, "\r\n")); err != nil {
return
}
}
@ -75,8 +97,50 @@ func (self *Client) WriteRequest(method string, uri string, headers []string) (e
return
}
func (self *Client) ReadResponse() (statusCode int, body io.Reader, err error) {
br := bufio.NewReader(self.conn)
func (self *Client) ReadResponse() (res Response, err error) {
var br *bufio.Reader
defer func() {
if br != nil {
buf, _ := br.Peek(br.Buffered())
self.rconn = io.MultiReader(bytes.NewReader(buf), self.rconn)
}
if res.StatusCode == 200 {
if res.ContentLength > 0 {
res.Body = make([]byte, res.ContentLength)
if _, err = io.ReadFull(self.rconn, res.Body); err != nil {
return
}
}
} else if res.BlockLength > 0 {
res.Block = make([]byte, res.BlockLength)
if _, err = io.ReadFull(self.rconn, res.Block); err != nil {
return
}
}
}()
var h [4]byte
if _, err = io.ReadFull(self.rconn, h[:]); err != nil {
return
}
if h[0] == 36 {
// $
res.BlockLength = int(h[2])<<8+int(h[3])
res.BlockNo = int(h[1])
if self.DebugConn {
fmt.Println("block: len", res.BlockLength, "no", res.BlockNo)
}
return
} else if h[0] == 82 {
// RTSP 200 OK
self.rconn = io.MultiReader(bytes.NewReader(h[:]), self.rconn)
} else {
err = fmt.Errorf("invalid response bytes=%v", h)
return
}
br = bufio.NewReader(self.rconn)
tp := textproto.NewReader(br)
var line string
@ -84,29 +148,33 @@ func (self *Client) ReadResponse() (statusCode int, body io.Reader, err error) {
return
}
if self.DebugConn {
fmt.Println(line)
fmt.Println("<", line)
}
fline := strings.SplitN(line, " ", 3)
if len(fline) < 2 {
err = fmt.Errorf("malformed RTSP response")
err = fmt.Errorf("malformed RTSP response line")
return
}
if statusCode, err = strconv.Atoi(fline[1]); err != nil {
if res.StatusCode, err = strconv.Atoi(fline[1]); err != nil {
return
}
if statusCode != 200 && statusCode != 401 {
err = fmt.Errorf("statusCode(%d) invalid", statusCode)
return
}
var header textproto.MIMEHeader
if header, err = tp.ReadMIMEHeader(); err != nil {
return
}
if statusCode == 401 {
if self.DebugConn {
fmt.Println("<", header)
}
if res.StatusCode != 200 && res.StatusCode != 401 {
err = fmt.Errorf("StatusCode=%d invalid", res.StatusCode)
return
}
if res.StatusCode == 401 {
/*
RTSP/1.0 401 Unauthorized
CSeq: 2
@ -149,7 +217,7 @@ func (self *Client) ReadResponse() (statusCode int, body io.Reader, err error) {
hs2 := md5hash("DESCRIBE:"+self.requestUri)
response := md5hash(hs1+":"+nonce+":"+hs2)
self.authorization = fmt.Sprintf(
`Authorization: Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s"`,
`Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s"`,
username, realm, nonce, self.requestUri, response)
}
}
@ -161,29 +229,25 @@ func (self *Client) ReadResponse() (statusCode int, body io.Reader, err error) {
}
}
clen, _ := strconv.Atoi(header.Get("Content-Length"))
if statusCode == 200 {
if clen > 0 {
body = io.LimitReader(br, int64(clen))
} else {
body = io.MultiReader(io.LimitReader(br, int64(br.Buffered())), self.conn)
}
}
res.ContentLength, _ = strconv.Atoi(header.Get("Content-Length"))
return
}
func (self *Client) Setup(streams []int) (err error) {
for _, si := range streams {
reqhdr := []string{fmt.Sprintf("Transport: RTP/AVP/TCP;unicast;interleaved=%d-%d", si*2, si*2+1)}
if self.session != "" {
reqhdr = append(reqhdr, "Session: "+self.session)
req := Request{
Method: "SETUP",
Uri: self.requestUri+"/"+self.streams[si].Sdp.Control,
}
if err = self.WriteRequest("SETUP", self.requestUri+"/"+self.streams[si].control, reqhdr); err != nil {
req.Header = append(req.Header, fmt.Sprintf("Transport: RTP/AVP/TCP;unicast;interleaved=%d-%d", si*2, si*2+1))
if self.session != "" {
req.Header = append(req.Header, "Session: "+self.session)
}
if err = self.WriteRequest(req); err != nil {
return
}
if _, _, err = self.ReadResponse(); err != nil {
if _, err = self.ReadResponse(); err != nil {
return
}
}
@ -195,124 +259,356 @@ func md5hash(s string) string {
return hex.EncodeToString(h[:])
}
func (self *Client) Describe() (streams []*Stream, err error) {
var body io.Reader
var statusCode int
func (self *Client) Describe() (streams []av.Stream, err error) {
var res Response
for i := 0; i < 2; i++ {
reqhdr := []string{}
req := Request{
Method: "DESCRIBE",
Uri: self.requestUri,
}
if self.authorization != "" {
reqhdr = append(reqhdr, self.authorization)
req.Header = append(req.Header, "Authorization: "+self.authorization)
}
if err = self.WriteRequest("DESCRIBE", self.requestUri, reqhdr); err != nil {
if err = self.WriteRequest(req); err != nil {
return
}
if statusCode, body, err = self.ReadResponse(); err != nil {
if res, err = self.ReadResponse(); err != nil {
return
}
if statusCode == 200 {
if res.StatusCode == 200 {
break
}
}
if body == nil {
if res.ContentLength == 0 {
err = fmt.Errorf("Describe failed")
return
}
br := bufio.NewReader(body)
tp := textproto.NewReader(br)
var stream *Stream
body := string(res.Body)
for {
line, err := tp.ReadLine()
if err != nil {
break
}
if self.DebugConn {
fmt.Println("<", body)
}
if self.DebugConn {
fmt.Println(line)
}
self.streams = []*Stream{}
for _, info := range sdp.Decode(body) {
stream := &Stream{Sdp: info}
stream.SetType(info.Type)
typeval := strings.SplitN(line, "=", 2)
if len(typeval) == 2 {
fields := strings.Split(typeval[1], " ")
switch typeval[0] {
case "m":
if len(fields) > 0 {
switch fields[0] {
case "audio", "video":
stream = &Stream{typestr: fields[0]}
self.streams = append(self.streams, stream)
}
}
case "a":
if stream != nil {
for _, field := range fields {
keyval := strings.Split(field, ":")
if len(keyval) >= 2 {
key := keyval[0]
val := keyval[1]
if key == "control" {
stream.control = val
}
}
switch info.Type {
case av.H264:
var sps, pps []byte
for _, nalu := range info.SpropParameterSets {
if len(nalu) > 0 {
switch nalu[0]&0x1f {
case 7:
sps = nalu
case 8:
pps = nalu
}
}
}
if len(sps) > 0 && len(pps) > 0 {
codecData, _ := h264parser.CreateCodecDataBySPSAndPPS(sps, pps)
if err = stream.SetCodecData(codecData); err != nil {
err = fmt.Errorf("h264 sps/pps invalid: %s", err)
return
}
} else {
err = fmt.Errorf("h264 sdp sprop-parameter-sets invalid: missing sps or pps")
return
}
case av.AAC:
if len(info.Config) == 0 {
err = fmt.Errorf("aac sdp config missing")
return
}
if err = stream.SetCodecData(info.Config); err != nil {
err = fmt.Errorf("aac sdp config invalid: %s", err)
return
}
}
self.streams = append(self.streams, stream)
}
streams = self.streams
for _, stream := range self.streams {
streams = append(streams, stream)
}
return
}
func (self *Client) Options() (err error) {
if err = self.WriteRequest("OPTIONS", self.requestUri, []string{}); err != nil {
if err = self.WriteRequest(Request{
Method: "OPTIONS",
Uri: self.requestUri,
}); err != nil {
return
}
if _, _, err = self.ReadResponse(); err != nil {
if _, err = self.ReadResponse(); err != nil {
return
}
return
}
func (self *Client) readBlock() (err error) {
var h [4]byte
for {
if _, err = io.ReadFull(self.body, h[:]); err != nil {
return
}
if h[0] != 36 {
err = fmt.Errorf("block not start with $")
fmt.Println(h)
return
}
length := int(h[2])<<8+int(h[3])
func (self *Stream) handleH264Payload(naluType byte, timestamp uint32, packet []byte) (err error) {
/*
Table 7-1 NAL unit type codes
1 Coded slice of a non-IDR picture
5 Coded slice of an IDR picture
6 Supplemental enhancement information (SEI)
7 Sequence parameter set
8 Picture parameter set
*/
switch naluType {
case 7,8:
// sps/pps
if self.DebugConn {
fmt.Println("packet", length, h[1])
}
if _, err = io.CopyN(ioutil.Discard, self.body, int64(length)); err != nil {
return
default:
if naluType == 5 {
self.pkt.IsKeyFrame = true
}
self.gotpkt = true
self.pkt.Data = packet
}
return
}
func (self *Client) ReadHeader() (streams []av.Stream, err error) {
if err = self.WriteRequest("PLAY", self.requestUri, []string{"Session: "+self.session}); err != nil {
return
}
if _, self.body, err = self.ReadResponse(); err != nil {
return
}
func (self *Stream) handlePacket(timestamp uint32, packet []byte) (err error) {
switch self.Type() {
case av.AAC:
for {
if err = self.readBlock(); err != nil {
case av.H264:
/*
+---------------+
|0|1|2|3|4|5|6|7|
+-+-+-+-+-+-+-+-+
|F|NRI| Type |
+---------------+
*/
naluType := packet[0]&0x1f
/*
NAL Unit Packet Packet Type Name Section
Type Type
-------------------------------------------------------------
0 reserved -
1-23 NAL unit Single NAL unit packet 5.6
24 STAP-A Single-time aggregation packet 5.7.1
25 STAP-B Single-time aggregation packet 5.7.1
26 MTAP16 Multi-time aggregation packet 5.7.2
27 MTAP24 Multi-time aggregation packet 5.7.2
28 FU-A Fragmentation unit 5.8
29 FU-B Fragmentation unit 5.8
30-31 reserved -
*/
switch {
case naluType >= 1 && naluType <= 23:
if err = self.handleH264Payload(naluType, timestamp, packet); err != nil {
return
}
case naluType == 28:
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| FU indicator | FU header | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |
| FU payload |
| |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| :...OPTIONAL RTP padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Figure 14. RTP payload format for FU-A
The FU indicator octet has the following format:
+---------------+
|0|1|2|3|4|5|6|7|
+-+-+-+-+-+-+-+-+
|F|NRI| Type |
+---------------+
The FU header has the following format:
+---------------+
|0|1|2|3|4|5|6|7|
+-+-+-+-+-+-+-+-+
|S|E|R| Type |
+---------------+
S: 1 bit
When set to one, the Start bit indicates the start of a fragmented
NAL unit. When the following FU payload is not the start of a
fragmented NAL unit payload, the Start bit is set to zero.
E: 1 bit
When set to one, the End bit indicates the end of a fragmented NAL
unit, i.e., the last byte of the payload is also the last byte of
the fragmented NAL unit. When the following FU payload is not the
last fragment of a fragmented NAL unit, the End bit is set to
zero.
R: 1 bit
The Reserved bit MUST be equal to 0 and MUST be ignored by the
receiver.
Type: 5 bits
The NAL unit payload type as defined in table 7-1 of [1].
*/
fuIndicator := packet[0]
fuHeader := packet[1]
isStart := fuHeader&0x80!=0
isEnd := fuHeader&0x40!=0
naluType := fuHeader&0x1f
if isStart {
self.fuBuffer = []byte{fuIndicator&0xe0|fuHeader&0x1f}
}
self.fuBuffer = append(self.fuBuffer, packet[2:]...)
if isEnd {
if err = self.handleH264Payload(naluType, timestamp, self.fuBuffer); err != nil {
return
}
}
default:
err = fmt.Errorf("unsupported H264 naluType=%d", naluType)
return
}
}
return
}
func (self *Client) parseBlock(blockNo int, packet []byte) (streamIndex int, err error) {
if blockNo % 2 != 0 {
// rtcp block
return
}
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|X| CC |M| PT | sequence number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| synchronization source (SSRC) identifier |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| contributing source (CSRC) identifiers |
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
if len(packet) < 8 {
err = fmt.Errorf("rtp packet too short")
return
}
payloadOffset := 12+int(packet[0]&0xf)*4
if payloadOffset+2 > len(packet) {
err = fmt.Errorf("rtp packet too short")
return
}
timestamp := binary.BigEndian.Uint32(packet[4:8])
payload := packet[payloadOffset:]
/*
PT Encoding Name Audio/Video (A/V) Clock Rate (Hz) Channels Reference
0 PCMU A 8000 1 [RFC3551]
1 Reserved
2 Reserved
3 GSM A 8000 1 [RFC3551]
4 G723 A 8000 1 [Vineet_Kumar][RFC3551]
5 DVI4 A 8000 1 [RFC3551]
6 DVI4 A 16000 1 [RFC3551]
7 LPC A 8000 1 [RFC3551]
8 PCMA A 8000 1 [RFC3551]
9 G722 A 8000 1 [RFC3551]
10 L16 A 44100 2 [RFC3551]
11 L16 A 44100 1 [RFC3551]
12 QCELP A 8000 1 [RFC3551]
13 CN A 8000 1 [RFC3389]
14 MPA A 90000 [RFC3551][RFC2250]
15 G728 A 8000 1 [RFC3551]
16 DVI4 A 11025 1 [Joseph_Di_Pol]
17 DVI4 A 22050 1 [Joseph_Di_Pol]
18 G729 A 8000 1 [RFC3551]
19 Reserved A
20 Unassigned A
21 Unassigned A
22 Unassigned A
23 Unassigned A
24 Unassigned V
25 CelB V 90000 [RFC2029]
26 JPEG V 90000 [RFC2435]
27 Unassigned V
28 nv V 90000 [RFC3551]
29 Unassigned V
30 Unassigned V
31 H261 V 90000 [RFC4587]
32 MPV V 90000 [RFC2250]
33 MP2T AV 90000 [RFC2250]
34 H263 V 90000 [Chunrong_Zhu]
35-71 Unassigned ?
72-76 Reserved for RTCP conflict avoidance [RFC3551]
77-95 Unassigned ?
96-127 dynamic ? [RFC3551]
*/
//payloadType := packet[1]&0x7f
streamIndex = blockNo/2
stream := self.streams[streamIndex]
if self.DebugConn {
//fmt.Println("packet:", stream.Type(), "offset", payloadOffset, "pt", payloadType)
if len(packet)>24 {
fmt.Println(hex.Dump(packet[:24]))
}
}
if err = stream.handlePacket(timestamp, payload); err != nil {
return
}
return
}
func (self *Client) Play() (err error) {
req := Request{
Method: "PLAY",
Uri: self.requestUri,
}
req.Header = append(req.Header, "Session: "+self.session)
if err = self.WriteRequest(req); err != nil {
return
}
return
}
func (self *Client) ReadPacket() (streamIndex int, pkt av.Packet, err error) {
for {
var res Response
if res, err = self.ReadResponse(); err != nil {
return
}
if res.BlockLength > 0 {
if streamIndex, err = self.parseBlock(res.BlockNo, res.Block); err != nil {
return
}
stream := self.streams[streamIndex]
if stream.gotpkt {
pkt = stream.pkt
stream.gotpkt = false
return
}
}
}
return
}

94
sdp/parser.go Normal file
View File

@ -0,0 +1,94 @@
package sdp
import (
"strings"
"strconv"
"encoding/hex"
"encoding/base64"
"github.com/nareix/av"
)
type Info struct {
AVType string
Type av.CodecType
TimeScale int
Control string
Rtpmap int
Config []byte
SpropParameterSets [][]byte
}
func Decode(content string) (infos []Info) {
var info *Info
for _, line := range strings.Split(content, "\n") {
line = strings.Trim(line, "\r")
typeval := strings.SplitN(line, "=", 2)
if len(typeval) == 2 {
fields := strings.Split(typeval[1], " ")
switch typeval[0] {
case "m":
if len(fields) > 0 {
switch fields[0] {
case "audio", "video":
infos = append(infos, Info{AVType: fields[0]})
info = &infos[len(infos)-1]
}
}
case "a":
if info != nil {
for _, field := range fields {
keyval := strings.Split(field, ":")
if len(keyval) >= 2 {
key := keyval[0]
val := keyval[1]
switch key {
case "control":
info.Control = val
case "rtpmap":
info.Rtpmap, _ = strconv.Atoi(val)
}
}
keyval = strings.Split(field, "/")
if len(keyval) >= 2 {
key := keyval[0]
switch key {
case "MPEG4-GENERIC":
info.Type = av.AAC
info.TimeScale, _ = strconv.Atoi(keyval[1])
case "H264":
info.Type = av.H264
info.TimeScale, _ = strconv.Atoi(keyval[1])
}
}
keyval = strings.Split(field, ";")
if len(keyval) > 1 {
for _, field := range keyval {
keyval := strings.SplitN(field, "=", 2)
if len(keyval) == 2 {
key := keyval[0]
val := keyval[1]
switch key {
case "config":
info.Config, _ = hex.DecodeString(val)
case "sprop-parameter-sets":
fields := strings.Split(val, ",")
for _, field := range fields {
val, _ := base64.StdEncoding.DecodeString(field)
info.SpropParameterSets = append(info.SpropParameterSets, val)
}
}
}
}
}
}
}
}
}
}
return
}

36
sdp/parser_test.go Normal file
View File

@ -0,0 +1,36 @@
package sdp
import (
"testing"
)
func TestParse(t *testing.T) {
infos := Decode(`
v=0
o=- 1459325504777324 1 IN IP4 192.168.0.123
s=RTSP/RTP stream from Network Video Server
i=mpeg4cif
t=0 0
a=tool:LIVE555 Streaming Media v2009.09.28
a=type:broadcast
a=control:*
a=range:npt=0-
a=x-qt-text-nam:RTSP/RTP stream from Network Video Server
a=x-qt-text-inf:mpeg4cif
m=video 0 RTP/AVP 96
c=IN IP4 0.0.0.0
b=AS:300
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1;profile-level-id=640028;sprop-parameter-sets=Z2QAKK2EBUViuKxUdCAqKxXFYqOhAVFYrisVHQgKisVxWKjoQFRWK4rFR0ICorFcVio6ECSFITk8nyfk/k/J8nm5s00IEkKQnJ5Pk/J/J+T5PNzZprQFoe0qQAAAHgAABDgYEABJPAAUmW974XhEI1A=,aO48sA==;config=0000000167640028ad84054562b8ac5474202a2b15c562a3a1015158ae2b151d080a8ac57158a8e84054562b8ac5474202a2b15c562a3a10248521393c9f27e4fe4fc9f279b9b34d081242909c9e4f93f27f27e4f93cdcd9a6b405a1ed2a4000001e00000438181000493c0014996f7be1784423500000000168ee3cb0
a=x-dimensions: 720, 480
a=x-framerate: 15
a=control:track1
m=audio 0 RTP/AVP 96
c=IN IP4 0.0.0.0
b=AS:256
a=rtpmap:96 MPEG4-GENERIC/16000/2
a=fmtp:96 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408
a=control:track2
`)
t.Logf("%v", infos)
}

View File

@ -2,19 +2,27 @@ package rtsp
import (
"github.com/nareix/av"
"github.com/nareix/rtsp/sdp"
)
type Stream struct {
av.StreamCommon
typestr string
control string
Sdp sdp.Info
// h264
fuBuffer []byte
sps []byte
pps []byte
gotpkt bool
pkt av.Packet
}
func (self Stream) IsAudio() bool {
return self.typestr == "audio"
return self.Sdp.AVType == "audio"
}
func (self Stream) IsVideo() bool {
return self.typestr == "video"
return self.Sdp.AVType == "video"
}