diff --git a/client.go b/client.go index 7a75588..1a4576d 100644 --- a/client.go +++ b/client.go @@ -3,22 +3,26 @@ package rtsp import ( "fmt" "net" + "bytes" "io" - "io/ioutil" "strings" "strconv" "bufio" "net/textproto" "net/url" "encoding/hex" + "encoding/binary" "crypto/md5" "github.com/nareix/av" + "github.com/nareix/codec/h264parser" + "github.com/nareix/rtsp/sdp" ) type Client struct { DebugConn bool url *url.URL conn net.Conn + rconn io.Reader requestUri string cseq uint streams []*Stream @@ -27,6 +31,23 @@ type Client struct { body io.Reader } +type Request struct { + Header []string + Uri string + Method string +} + +type Response struct { + BlockLength int + Block []byte + BlockNo int + + StatusCode int + Header textproto.MIMEHeader + ContentLength int + Body []byte +} + func Connect(uri string) (self *Client, err error) { var URL *url.URL if URL, err = url.Parse(uri); err != nil { @@ -44,6 +65,7 @@ func Connect(uri string) (self *Client, err error) { self = &Client{ conn: conn, + rconn: conn, url: URL, requestUri: u2.String(), } @@ -52,20 +74,20 @@ func Connect(uri string) (self *Client, err error) { func (self *Client) writeLine(line string) (err error) { if self.DebugConn { - fmt.Print(line) + fmt.Print("> ", line) } _, err = fmt.Fprint(self.conn, line) return } -func (self *Client) WriteRequest(method string, uri string, headers []string) (err error) { +func (self *Client) WriteRequest(req Request) (err error) { self.cseq++ - headers = append(headers, fmt.Sprintf("CSeq: %d", self.cseq)) - if err = self.writeLine(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, uri)); err != nil { + req.Header = append(req.Header, fmt.Sprintf("CSeq: %d", self.cseq)) + if err = self.writeLine(fmt.Sprintf("%s %s RTSP/1.0\r\n", req.Method, req.Uri)); err != nil { return } - for _, header := range headers { - if err = self.writeLine(header+"\r\n"); err != nil { + for _, v := range req.Header { + if err = self.writeLine(fmt.Sprint(v, "\r\n")); err != nil { return } } @@ -75,8 +97,50 @@ func (self *Client) WriteRequest(method string, uri string, headers []string) (e return } -func (self *Client) ReadResponse() (statusCode int, body io.Reader, err error) { - br := bufio.NewReader(self.conn) +func (self *Client) ReadResponse() (res Response, err error) { + var br *bufio.Reader + + defer func() { + if br != nil { + buf, _ := br.Peek(br.Buffered()) + self.rconn = io.MultiReader(bytes.NewReader(buf), self.rconn) + } + if res.StatusCode == 200 { + if res.ContentLength > 0 { + res.Body = make([]byte, res.ContentLength) + if _, err = io.ReadFull(self.rconn, res.Body); err != nil { + return + } + } + } else if res.BlockLength > 0 { + res.Block = make([]byte, res.BlockLength) + if _, err = io.ReadFull(self.rconn, res.Block); err != nil { + return + } + } + }() + + var h [4]byte + if _, err = io.ReadFull(self.rconn, h[:]); err != nil { + return + } + if h[0] == 36 { + // $ + res.BlockLength = int(h[2])<<8+int(h[3]) + res.BlockNo = int(h[1]) + if self.DebugConn { + fmt.Println("block: len", res.BlockLength, "no", res.BlockNo) + } + return + } else if h[0] == 82 { + // RTSP 200 OK + self.rconn = io.MultiReader(bytes.NewReader(h[:]), self.rconn) + } else { + err = fmt.Errorf("invalid response bytes=%v", h) + return + } + + br = bufio.NewReader(self.rconn) tp := textproto.NewReader(br) var line string @@ -84,29 +148,33 @@ func (self *Client) ReadResponse() (statusCode int, body io.Reader, err error) { return } if self.DebugConn { - fmt.Println(line) + fmt.Println("<", line) } fline := strings.SplitN(line, " ", 3) if len(fline) < 2 { - err = fmt.Errorf("malformed RTSP response") + err = fmt.Errorf("malformed RTSP response line") return } - if statusCode, err = strconv.Atoi(fline[1]); err != nil { + if res.StatusCode, err = strconv.Atoi(fline[1]); err != nil { return } - if statusCode != 200 && statusCode != 401 { - err = fmt.Errorf("statusCode(%d) invalid", statusCode) - return - } - var header textproto.MIMEHeader if header, err = tp.ReadMIMEHeader(); err != nil { return } - if statusCode == 401 { + if self.DebugConn { + fmt.Println("<", header) + } + + if res.StatusCode != 200 && res.StatusCode != 401 { + err = fmt.Errorf("StatusCode=%d invalid", res.StatusCode) + return + } + + if res.StatusCode == 401 { /* RTSP/1.0 401 Unauthorized CSeq: 2 @@ -149,7 +217,7 @@ func (self *Client) ReadResponse() (statusCode int, body io.Reader, err error) { hs2 := md5hash("DESCRIBE:"+self.requestUri) response := md5hash(hs1+":"+nonce+":"+hs2) self.authorization = fmt.Sprintf( - `Authorization: Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s"`, + `Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s"`, username, realm, nonce, self.requestUri, response) } } @@ -161,29 +229,25 @@ func (self *Client) ReadResponse() (statusCode int, body io.Reader, err error) { } } - clen, _ := strconv.Atoi(header.Get("Content-Length")) - - if statusCode == 200 { - if clen > 0 { - body = io.LimitReader(br, int64(clen)) - } else { - body = io.MultiReader(io.LimitReader(br, int64(br.Buffered())), self.conn) - } - } + res.ContentLength, _ = strconv.Atoi(header.Get("Content-Length")) return } func (self *Client) Setup(streams []int) (err error) { for _, si := range streams { - reqhdr := []string{fmt.Sprintf("Transport: RTP/AVP/TCP;unicast;interleaved=%d-%d", si*2, si*2+1)} - if self.session != "" { - reqhdr = append(reqhdr, "Session: "+self.session) + req := Request{ + Method: "SETUP", + Uri: self.requestUri+"/"+self.streams[si].Sdp.Control, } - if err = self.WriteRequest("SETUP", self.requestUri+"/"+self.streams[si].control, reqhdr); err != nil { + req.Header = append(req.Header, fmt.Sprintf("Transport: RTP/AVP/TCP;unicast;interleaved=%d-%d", si*2, si*2+1)) + if self.session != "" { + req.Header = append(req.Header, "Session: "+self.session) + } + if err = self.WriteRequest(req); err != nil { return } - if _, _, err = self.ReadResponse(); err != nil { + if _, err = self.ReadResponse(); err != nil { return } } @@ -195,124 +259,356 @@ func md5hash(s string) string { return hex.EncodeToString(h[:]) } -func (self *Client) Describe() (streams []*Stream, err error) { - var body io.Reader - var statusCode int +func (self *Client) Describe() (streams []av.Stream, err error) { + var res Response for i := 0; i < 2; i++ { - reqhdr := []string{} + req := Request{ + Method: "DESCRIBE", + Uri: self.requestUri, + } if self.authorization != "" { - reqhdr = append(reqhdr, self.authorization) + req.Header = append(req.Header, "Authorization: "+self.authorization) } - if err = self.WriteRequest("DESCRIBE", self.requestUri, reqhdr); err != nil { + if err = self.WriteRequest(req); err != nil { return } - if statusCode, body, err = self.ReadResponse(); err != nil { + if res, err = self.ReadResponse(); err != nil { return } - if statusCode == 200 { + if res.StatusCode == 200 { break } } - if body == nil { + if res.ContentLength == 0 { err = fmt.Errorf("Describe failed") return } - br := bufio.NewReader(body) - tp := textproto.NewReader(br) - var stream *Stream + body := string(res.Body) - for { - line, err := tp.ReadLine() - if err != nil { - break - } + if self.DebugConn { + fmt.Println("<", body) + } - if self.DebugConn { - fmt.Println(line) - } + self.streams = []*Stream{} + for _, info := range sdp.Decode(body) { + stream := &Stream{Sdp: info} + stream.SetType(info.Type) - typeval := strings.SplitN(line, "=", 2) - if len(typeval) == 2 { - fields := strings.Split(typeval[1], " ") - switch typeval[0] { - case "m": - if len(fields) > 0 { - switch fields[0] { - case "audio", "video": - stream = &Stream{typestr: fields[0]} - self.streams = append(self.streams, stream) - } - } - - case "a": - if stream != nil { - for _, field := range fields { - keyval := strings.Split(field, ":") - if len(keyval) >= 2 { - key := keyval[0] - val := keyval[1] - if key == "control" { - stream.control = val - } - } + switch info.Type { + case av.H264: + var sps, pps []byte + for _, nalu := range info.SpropParameterSets { + if len(nalu) > 0 { + switch nalu[0]&0x1f { + case 7: + sps = nalu + case 8: + pps = nalu } } } + if len(sps) > 0 && len(pps) > 0 { + codecData, _ := h264parser.CreateCodecDataBySPSAndPPS(sps, pps) + if err = stream.SetCodecData(codecData); err != nil { + err = fmt.Errorf("h264 sps/pps invalid: %s", err) + return + } + } else { + err = fmt.Errorf("h264 sdp sprop-parameter-sets invalid: missing sps or pps") + return + } + + case av.AAC: + if len(info.Config) == 0 { + err = fmt.Errorf("aac sdp config missing") + return + } + if err = stream.SetCodecData(info.Config); err != nil { + err = fmt.Errorf("aac sdp config invalid: %s", err) + return + } } + + self.streams = append(self.streams, stream) } - streams = self.streams + for _, stream := range self.streams { + streams = append(streams, stream) + } return } func (self *Client) Options() (err error) { - if err = self.WriteRequest("OPTIONS", self.requestUri, []string{}); err != nil { + if err = self.WriteRequest(Request{ + Method: "OPTIONS", + Uri: self.requestUri, + }); err != nil { return } - if _, _, err = self.ReadResponse(); err != nil { + if _, err = self.ReadResponse(); err != nil { return } return } -func (self *Client) readBlock() (err error) { - var h [4]byte - for { - if _, err = io.ReadFull(self.body, h[:]); err != nil { - return - } - if h[0] != 36 { - err = fmt.Errorf("block not start with $") - fmt.Println(h) - return - } - length := int(h[2])<<8+int(h[3]) +func (self *Stream) handleH264Payload(naluType byte, timestamp uint32, packet []byte) (err error) { + /* + Table 7-1 – NAL unit type codes + 1 Coded slice of a non-IDR picture + 5 Coded slice of an IDR picture + 6 Supplemental enhancement information (SEI) + 7 Sequence parameter set + 8 Picture parameter set + */ + switch naluType { + case 7,8: + // sps/pps - if self.DebugConn { - fmt.Println("packet", length, h[1]) - } - - if _, err = io.CopyN(ioutil.Discard, self.body, int64(length)); err != nil { - return + default: + if naluType == 5 { + self.pkt.IsKeyFrame = true } + self.gotpkt = true + self.pkt.Data = packet } + + return } -func (self *Client) ReadHeader() (streams []av.Stream, err error) { - if err = self.WriteRequest("PLAY", self.requestUri, []string{"Session: "+self.session}); err != nil { - return - } - if _, self.body, err = self.ReadResponse(); err != nil { - return - } +func (self *Stream) handlePacket(timestamp uint32, packet []byte) (err error) { + switch self.Type() { + case av.AAC: - for { - if err = self.readBlock(); err != nil { + case av.H264: + /* + +---------------+ + |0|1|2|3|4|5|6|7| + +-+-+-+-+-+-+-+-+ + |F|NRI| Type | + +---------------+ + */ + naluType := packet[0]&0x1f + + /* + NAL Unit Packet Packet Type Name Section + Type Type + ------------------------------------------------------------- + 0 reserved - + 1-23 NAL unit Single NAL unit packet 5.6 + 24 STAP-A Single-time aggregation packet 5.7.1 + 25 STAP-B Single-time aggregation packet 5.7.1 + 26 MTAP16 Multi-time aggregation packet 5.7.2 + 27 MTAP24 Multi-time aggregation packet 5.7.2 + 28 FU-A Fragmentation unit 5.8 + 29 FU-B Fragmentation unit 5.8 + 30-31 reserved - + */ + + switch { + case naluType >= 1 && naluType <= 23: + if err = self.handleH264Payload(naluType, timestamp, packet); err != nil { + return + } + + case naluType == 28: + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | FU indicator | FU header | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | + | | + | FU payload | + | | + | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | :...OPTIONAL RTP padding | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + Figure 14. RTP payload format for FU-A + + The FU indicator octet has the following format: + +---------------+ + |0|1|2|3|4|5|6|7| + +-+-+-+-+-+-+-+-+ + |F|NRI| Type | + +---------------+ + + + The FU header has the following format: + +---------------+ + |0|1|2|3|4|5|6|7| + +-+-+-+-+-+-+-+-+ + |S|E|R| Type | + +---------------+ + + S: 1 bit + When set to one, the Start bit indicates the start of a fragmented + NAL unit. When the following FU payload is not the start of a + fragmented NAL unit payload, the Start bit is set to zero. + + E: 1 bit + When set to one, the End bit indicates the end of a fragmented NAL + unit, i.e., the last byte of the payload is also the last byte of + the fragmented NAL unit. When the following FU payload is not the + last fragment of a fragmented NAL unit, the End bit is set to + zero. + + R: 1 bit + The Reserved bit MUST be equal to 0 and MUST be ignored by the + receiver. + + Type: 5 bits + The NAL unit payload type as defined in table 7-1 of [1]. + */ + fuIndicator := packet[0] + fuHeader := packet[1] + isStart := fuHeader&0x80!=0 + isEnd := fuHeader&0x40!=0 + naluType := fuHeader&0x1f + if isStart { + self.fuBuffer = []byte{fuIndicator&0xe0|fuHeader&0x1f} + } + self.fuBuffer = append(self.fuBuffer, packet[2:]...) + if isEnd { + if err = self.handleH264Payload(naluType, timestamp, self.fuBuffer); err != nil { + return + } + } + + default: + err = fmt.Errorf("unsupported H264 naluType=%d", naluType) return } } return } +func (self *Client) parseBlock(blockNo int, packet []byte) (streamIndex int, err error) { + if blockNo % 2 != 0 { + // rtcp block + return + } + + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|X| CC |M| PT | sequence number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | timestamp | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | synchronization source (SSRC) identifier | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | contributing source (CSRC) identifiers | + | .... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + + if len(packet) < 8 { + err = fmt.Errorf("rtp packet too short") + return + } + payloadOffset := 12+int(packet[0]&0xf)*4 + if payloadOffset+2 > len(packet) { + err = fmt.Errorf("rtp packet too short") + return + } + + timestamp := binary.BigEndian.Uint32(packet[4:8]) + payload := packet[payloadOffset:] + + /* + PT Encoding Name Audio/Video (A/V) Clock Rate (Hz) Channels Reference + 0 PCMU A 8000 1 [RFC3551] + 1 Reserved + 2 Reserved + 3 GSM A 8000 1 [RFC3551] + 4 G723 A 8000 1 [Vineet_Kumar][RFC3551] + 5 DVI4 A 8000 1 [RFC3551] + 6 DVI4 A 16000 1 [RFC3551] + 7 LPC A 8000 1 [RFC3551] + 8 PCMA A 8000 1 [RFC3551] + 9 G722 A 8000 1 [RFC3551] + 10 L16 A 44100 2 [RFC3551] + 11 L16 A 44100 1 [RFC3551] + 12 QCELP A 8000 1 [RFC3551] + 13 CN A 8000 1 [RFC3389] + 14 MPA A 90000 [RFC3551][RFC2250] + 15 G728 A 8000 1 [RFC3551] + 16 DVI4 A 11025 1 [Joseph_Di_Pol] + 17 DVI4 A 22050 1 [Joseph_Di_Pol] + 18 G729 A 8000 1 [RFC3551] + 19 Reserved A + 20 Unassigned A + 21 Unassigned A + 22 Unassigned A + 23 Unassigned A + 24 Unassigned V + 25 CelB V 90000 [RFC2029] + 26 JPEG V 90000 [RFC2435] + 27 Unassigned V + 28 nv V 90000 [RFC3551] + 29 Unassigned V + 30 Unassigned V + 31 H261 V 90000 [RFC4587] + 32 MPV V 90000 [RFC2250] + 33 MP2T AV 90000 [RFC2250] + 34 H263 V 90000 [Chunrong_Zhu] + 35-71 Unassigned ? + 72-76 Reserved for RTCP conflict avoidance [RFC3551] + 77-95 Unassigned ? + 96-127 dynamic ? [RFC3551] + */ + //payloadType := packet[1]&0x7f + + streamIndex = blockNo/2 + stream := self.streams[streamIndex] + + if self.DebugConn { + //fmt.Println("packet:", stream.Type(), "offset", payloadOffset, "pt", payloadType) + if len(packet)>24 { + fmt.Println(hex.Dump(packet[:24])) + } + } + + if err = stream.handlePacket(timestamp, payload); err != nil { + return + } + + return +} + +func (self *Client) Play() (err error) { + req := Request{ + Method: "PLAY", + Uri: self.requestUri, + } + req.Header = append(req.Header, "Session: "+self.session) + if err = self.WriteRequest(req); err != nil { + return + } + return +} + +func (self *Client) ReadPacket() (streamIndex int, pkt av.Packet, err error) { + for { + var res Response + if res, err = self.ReadResponse(); err != nil { + return + } + if res.BlockLength > 0 { + if streamIndex, err = self.parseBlock(res.BlockNo, res.Block); err != nil { + return + } + stream := self.streams[streamIndex] + if stream.gotpkt { + pkt = stream.pkt + stream.gotpkt = false + return + } + } + } + return +} + diff --git a/sdp/parser.go b/sdp/parser.go new file mode 100644 index 0000000..b149f03 --- /dev/null +++ b/sdp/parser.go @@ -0,0 +1,94 @@ +package sdp + +import ( + "strings" + "strconv" + "encoding/hex" + "encoding/base64" + "github.com/nareix/av" +) + +type Info struct { + AVType string + Type av.CodecType + TimeScale int + Control string + Rtpmap int + Config []byte + SpropParameterSets [][]byte +} + +func Decode(content string) (infos []Info) { + var info *Info + + for _, line := range strings.Split(content, "\n") { + line = strings.Trim(line, "\r") + typeval := strings.SplitN(line, "=", 2) + if len(typeval) == 2 { + fields := strings.Split(typeval[1], " ") + + switch typeval[0] { + case "m": + if len(fields) > 0 { + switch fields[0] { + case "audio", "video": + infos = append(infos, Info{AVType: fields[0]}) + info = &infos[len(infos)-1] + } + } + + case "a": + if info != nil { + for _, field := range fields { + keyval := strings.Split(field, ":") + if len(keyval) >= 2 { + key := keyval[0] + val := keyval[1] + switch key { + case "control": + info.Control = val + case "rtpmap": + info.Rtpmap, _ = strconv.Atoi(val) + } + } + keyval = strings.Split(field, "/") + if len(keyval) >= 2 { + key := keyval[0] + switch key { + case "MPEG4-GENERIC": + info.Type = av.AAC + info.TimeScale, _ = strconv.Atoi(keyval[1]) + case "H264": + info.Type = av.H264 + info.TimeScale, _ = strconv.Atoi(keyval[1]) + } + } + keyval = strings.Split(field, ";") + if len(keyval) > 1 { + for _, field := range keyval { + keyval := strings.SplitN(field, "=", 2) + if len(keyval) == 2 { + key := keyval[0] + val := keyval[1] + switch key { + case "config": + info.Config, _ = hex.DecodeString(val) + case "sprop-parameter-sets": + fields := strings.Split(val, ",") + for _, field := range fields { + val, _ := base64.StdEncoding.DecodeString(field) + info.SpropParameterSets = append(info.SpropParameterSets, val) + } + } + } + } + } + } + } + + } + } + } + return +} + diff --git a/sdp/parser_test.go b/sdp/parser_test.go new file mode 100644 index 0000000..c62d360 --- /dev/null +++ b/sdp/parser_test.go @@ -0,0 +1,36 @@ +package sdp + +import ( + "testing" +) + +func TestParse(t *testing.T) { + infos := Decode(` +v=0 +o=- 1459325504777324 1 IN IP4 192.168.0.123 +s=RTSP/RTP stream from Network Video Server +i=mpeg4cif +t=0 0 +a=tool:LIVE555 Streaming Media v2009.09.28 +a=type:broadcast +a=control:* +a=range:npt=0- +a=x-qt-text-nam:RTSP/RTP stream from Network Video Server +a=x-qt-text-inf:mpeg4cif +m=video 0 RTP/AVP 96 +c=IN IP4 0.0.0.0 +b=AS:300 +a=rtpmap:96 H264/90000 +a=fmtp:96 packetization-mode=1;profile-level-id=640028;sprop-parameter-sets=Z2QAKK2EBUViuKxUdCAqKxXFYqOhAVFYrisVHQgKisVxWKjoQFRWK4rFR0ICorFcVio6ECSFITk8nyfk/k/J8nm5s00IEkKQnJ5Pk/J/J+T5PNzZprQFoe0qQAAAHgAABDgYEABJPAAUmW974XhEI1A=,aO48sA==;config=0000000167640028ad84054562b8ac5474202a2b15c562a3a1015158ae2b151d080a8ac57158a8e84054562b8ac5474202a2b15c562a3a10248521393c9f27e4fe4fc9f279b9b34d081242909c9e4f93f27f27e4f93cdcd9a6b405a1ed2a4000001e00000438181000493c0014996f7be1784423500000000168ee3cb0 +a=x-dimensions: 720, 480 +a=x-framerate: 15 +a=control:track1 +m=audio 0 RTP/AVP 96 +c=IN IP4 0.0.0.0 +b=AS:256 +a=rtpmap:96 MPEG4-GENERIC/16000/2 +a=fmtp:96 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408 +a=control:track2 +`) + t.Logf("%v", infos) +} diff --git a/stream.go b/stream.go index bd9b1d1..178cf48 100644 --- a/stream.go +++ b/stream.go @@ -2,19 +2,27 @@ package rtsp import ( "github.com/nareix/av" + "github.com/nareix/rtsp/sdp" ) type Stream struct { av.StreamCommon - typestr string - control string + Sdp sdp.Info + + // h264 + fuBuffer []byte + sps []byte + pps []byte + + gotpkt bool + pkt av.Packet } func (self Stream) IsAudio() bool { - return self.typestr == "audio" + return self.Sdp.AVType == "audio" } func (self Stream) IsVideo() bool { - return self.typestr == "video" + return self.Sdp.AVType == "video" }