diff --git a/client.go b/client.go index 1a4576d..435a085 100644 --- a/client.go +++ b/client.go @@ -15,7 +15,9 @@ import ( "crypto/md5" "github.com/nareix/av" "github.com/nareix/codec/h264parser" + "github.com/nareix/codec/aacparser" "github.com/nareix/rtsp/sdp" + "github.com/nareix/av/pktqueue" ) type Client struct { @@ -29,6 +31,7 @@ type Client struct { session string authorization string body io.Reader + pktque *pktqueue.Queue } type Request struct { @@ -72,6 +75,13 @@ func Connect(uri string) (self *Client, err error) { return } +func (self *Client) Streams() (streams []av.CodecData) { + for _, stream := range self.streams { + streams = append(streams, stream.CodecData) + } + return +} + func (self *Client) writeLine(line string) (err error) { if self.DebugConn { fmt.Print("> ", line) @@ -124,6 +134,7 @@ func (self *Client) ReadResponse() (res Response, err error) { if _, err = io.ReadFull(self.rconn, h[:]); err != nil { return } + if h[0] == 36 { // $ res.BlockLength = int(h[2])<<8+int(h[3]) @@ -132,11 +143,35 @@ func (self *Client) ReadResponse() (res Response, err error) { fmt.Println("block: len", res.BlockLength, "no", res.BlockNo) } return - } else if h[0] == 82 { + } else if h[0] == 82 && h[1] == 84 && h[2] == 83 && h[3] == 80 { // RTSP 200 OK self.rconn = io.MultiReader(bytes.NewReader(h[:]), self.rconn) } else { - err = fmt.Errorf("invalid response bytes=%v", h) + for { + for { + var b [1]byte + if _, err = self.rconn.Read(b[:]); err != nil { + return + } + if b[0] == 36 { + break + } + } + if self.DebugConn { + fmt.Println("block: relocate") + } + if _, err = io.ReadFull(self.rconn, h[1:4]); err != nil { + return + } + res.BlockLength = int(h[2])<<8+int(h[3]) + res.BlockNo = int(h[1]) + if res.BlockNo/2 < len(self.streams) { + break + } + } + if self.DebugConn { + fmt.Println("block: len", res.BlockLength, "no", res.BlockNo) + } return } @@ -236,10 +271,14 @@ func (self *Client) ReadResponse() (res Response, err error) { func (self *Client) Setup(streams []int) (err error) { for _, si := range streams { - req := Request{ - Method: "SETUP", - Uri: self.requestUri+"/"+self.streams[si].Sdp.Control, + uri := "" + control := self.streams[si].Sdp.Control + if strings.HasPrefix(control, "rtsp://") { + uri = control + } else { + uri = self.requestUri+"/"+control } + req := Request{Method: "SETUP", Uri: uri} req.Header = append(req.Header, fmt.Sprintf("Transport: RTP/AVP/TCP;unicast;interleaved=%d-%d", si*2, si*2+1)) if self.session != "" { req.Header = append(req.Header, "Session: "+self.session) @@ -259,7 +298,7 @@ func md5hash(s string) string { return hex.EncodeToString(h[:]) } -func (self *Client) Describe() (streams []av.Stream, err error) { +func (self *Client) Describe() (streams []av.CodecData, err error) { var res Response for i := 0; i < 2; i++ { @@ -294,39 +333,48 @@ func (self *Client) Describe() (streams []av.Stream, err error) { self.streams = []*Stream{} for _, info := range sdp.Decode(body) { stream := &Stream{Sdp: info} - stream.SetType(info.Type) - switch info.Type { - case av.H264: - var sps, pps []byte - for _, nalu := range info.SpropParameterSets { - if len(nalu) > 0 { - switch nalu[0]&0x1f { - case 7: - sps = nalu - case 8: - pps = nalu + if info.PayloadType >= 96 && info.PayloadType <= 127 { + switch info.Type { + case av.H264: + var sps, pps []byte + for _, nalu := range info.SpropParameterSets { + if len(nalu) > 0 { + switch nalu[0]&0x1f { + case 7: + sps = nalu + case 8: + pps = nalu + } } } - } - if len(sps) > 0 && len(pps) > 0 { - codecData, _ := h264parser.CreateCodecDataBySPSAndPPS(sps, pps) - if err = stream.SetCodecData(codecData); err != nil { - err = fmt.Errorf("h264 sps/pps invalid: %s", err) + if len(sps) > 0 && len(pps) > 0 { + if stream.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil { + err = fmt.Errorf("h264 sps/pps invalid: %s", err) + return + } + } else { + err = fmt.Errorf("h264 sdp sprop-parameter-sets invalid: missing sps or pps") return } - } else { - err = fmt.Errorf("h264 sdp sprop-parameter-sets invalid: missing sps or pps") - return - } - case av.AAC: - if len(info.Config) == 0 { - err = fmt.Errorf("aac sdp config missing") - return + case av.AAC: + if len(info.Config) == 0 { + err = fmt.Errorf("aac sdp config missing") + return + } + if stream.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(info.Config); err != nil { + err = fmt.Errorf("aac sdp config invalid: %s", err) + return + } } - if err = stream.SetCodecData(info.Config); err != nil { - err = fmt.Errorf("aac sdp config invalid: %s", err) + } else { + switch info.PayloadType { + case 0: + stream.CodecData = av.NewPCMMulawCodecData() + + default: + err = fmt.Errorf("PayloadType=%d unsupported", info.PayloadType) return } } @@ -337,6 +385,11 @@ func (self *Client) Describe() (streams []av.Stream, err error) { for _, stream := range self.streams { streams = append(streams, stream) } + self.pktque = &pktqueue.Queue{ + Poll: self.poll, + } + self.pktque.Alloc(streams) + return } @@ -372,6 +425,7 @@ func (self *Stream) handleH264Payload(naluType byte, timestamp uint32, packet [] } self.gotpkt = true self.pkt.Data = packet + self.timestamp = timestamp } return @@ -379,8 +433,6 @@ func (self *Stream) handleH264Payload(naluType byte, timestamp uint32, packet [] func (self *Stream) handlePacket(timestamp uint32, packet []byte) (err error) { switch self.Type() { - case av.AAC: - case av.H264: /* +---------------+ @@ -480,6 +532,11 @@ func (self *Stream) handlePacket(timestamp uint32, packet []byte) (err error) { err = fmt.Errorf("unsupported H264 naluType=%d", naluType) return } + + default: + self.gotpkt = true + self.pkt.Data = packet + self.timestamp = timestamp } return } @@ -490,6 +547,9 @@ func (self *Client) parseBlock(blockNo int, packet []byte) (streamIndex int, err return } + streamIndex = blockNo/2 + stream := self.streams[streamIndex] + /* 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 @@ -562,9 +622,6 @@ func (self *Client) parseBlock(blockNo int, packet []byte) (streamIndex int, err */ //payloadType := packet[1]&0x7f - streamIndex = blockNo/2 - stream := self.streams[streamIndex] - if self.DebugConn { //fmt.Println("packet:", stream.Type(), "offset", payloadOffset, "pt", payloadType) if len(packet)>24 { @@ -591,19 +648,22 @@ func (self *Client) Play() (err error) { return } -func (self *Client) ReadPacket() (streamIndex int, pkt av.Packet, err error) { +func (self *Client) poll() (err error) { for { var res Response if res, err = self.ReadResponse(); err != nil { return } if res.BlockLength > 0 { - if streamIndex, err = self.parseBlock(res.BlockNo, res.Block); err != nil { + var i int + if i, err = self.parseBlock(res.BlockNo, res.Block); err != nil { return } - stream := self.streams[streamIndex] + stream := self.streams[i] if stream.gotpkt { - pkt = stream.pkt + time := float64(stream.timestamp)/float64(stream.Sdp.TimeScale) + self.pktque.WriteTimePacket(i, time, stream.pkt) + stream.pkt = av.Packet{} stream.gotpkt = false return } @@ -612,3 +672,37 @@ func (self *Client) ReadPacket() (streamIndex int, pkt av.Packet, err error) { return } +func (self *Client) ReadPacket() (i int, pkt av.Packet, err error) { + return self.pktque.ReadPacket() +} + +func Open(uri string) (cli *Client, err error) { + _cli, err := Connect(uri) + if err != nil { + return + } + + streams, err := _cli.Describe() + if err != nil { + return + } + + setup := []int{} + for i := range streams { + setup = append(setup, i) + } + + err = _cli.Setup(setup) + if err != nil { + return + } + + err = _cli.Play() + if err != nil { + return + } + + cli = _cli + return +} +