From c7d51e38e6eedbc2b466d15b4bea96be7833fd82 Mon Sep 17 00:00:00 2001 From: nareix Date: Thu, 23 Jun 2016 08:20:04 +0800 Subject: [PATCH] add SkipErrRtpBlock, improve parseBlockHeader --- client.go | 77 ++++++++++++++++++++++++++++++++++++++----------------- stream.go | 2 ++ 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/client.go b/client.go index d09f62b..ea7323f 100644 --- a/client.go +++ b/client.go @@ -30,6 +30,8 @@ type Client struct { DebugRtp bool Headers []string + SkipErrRtpBlock bool + RtspTimeout time.Duration RtpTimeout time.Duration RtpKeepAliveTimeout time.Duration @@ -179,13 +181,34 @@ func (self *Client) parseBlockHeader(h []byte) (length int, no int, valid bool) if no/2 >= len(self.streams) { return } - if length < 8 { - return - } - stream := self.streams[no/2] - if int(h[5]&0x7f) != stream.Sdp.PayloadType { - return + + if no%2 == 0 { // rtp + if length < 8 { + return + } + + // V=2 + if h[4]&0xc0 != 0x80 { + return + } + + stream := self.streams[no/2] + if int(h[5]&0x7f) != stream.Sdp.PayloadType { + return + } + + timestamp := binary.BigEndian.Uint32(h[8:12]) + if stream.firsttimestamp != 0 { + timestamp -= stream.firsttimestamp + if timestamp < stream.timestamp { + return + } else if timestamp - stream.timestamp > uint32(stream.timeScale*60*60) { + return + } + } + } else { // rtcp } + valid = true return } @@ -205,9 +228,7 @@ func (self *Client) parseHeaders(b []byte) (statusCode int, headers textproto.MI } } - if headers, _ = r.ReadMIMEHeader(); err != nil { - return - } + headers, _ = r.ReadMIMEHeader() return } @@ -298,6 +319,7 @@ func (self *Client) findRTSP() (block []byte, data []byte, err error) { for { var b byte if b, err = self.brconn.ReadByte(); err != nil { + err = fmt.Errorf("rtsp: find rtsp header failed: %v", err) return } switch b { @@ -335,11 +357,12 @@ func (self *Client) findRTSP() (block []byte, data []byte, err error) { return } - if stat == Dollar && len(peek) >= 8 { + if stat == Dollar && len(peek) >= 12 { if blocklen, _, ok := self.parseBlockHeader(peek); ok { left := blocklen+4-len(peek) block = append(peek, make([]byte, left)...) if _, err = io.ReadFull(self.brconn, block[len(peek):]); err != nil { + err = fmt.Errorf("rtsp: read block failed: %v", err) return } return @@ -366,6 +389,7 @@ func (self *Client) readLFLF() (block []byte, data []byte, err error) { for { var b byte if b, err = self.brconn.ReadByte(); err != nil { + err = fmt.Errorf("rtsp: read LFLF failed: %v", err) return } switch b { @@ -388,12 +412,13 @@ func (self *Client) readLFLF() (block []byte, data []byte, err error) { if stat == LFLF { data = peek return - } else if dollarpos != -1 && dollarpos - pos >= 8 { + } else if dollarpos != -1 && dollarpos - pos >= 12 { hdrlen := dollarpos-pos start := len(peek)-hdrlen if blocklen, _, ok := self.parseBlockHeader(peek[start:]); ok { block = append(peek[start:], make([]byte, blocklen+4-hdrlen)...) if _, err = io.ReadFull(self.brconn, block[hdrlen:]); err != nil { + err = fmt.Errorf("rtsp: read block failed: %v", err) return } return @@ -415,6 +440,7 @@ func (self *Client) readResp(b []byte) (res Response, err error) { if res.ContentLength > 0 { res.Body = make([]byte, res.ContentLength) if _, err = io.ReadFull(self.brconn, res.Body); err != nil { + err = fmt.Errorf("rtsp: read body failed: %v", err) return } } @@ -688,6 +714,12 @@ func (self *Stream) makeCodecData() (err error) { } } + self.timeScale = media.TimeScale + if self.timeScale == 0 { + // https://tools.ietf.org/html/rfc5391 + self.timeScale = 8000 + } + return } @@ -1038,15 +1070,18 @@ func (self *Client) handleBlock(block []byte) (pkt av.Packet, ok bool, err error } stream := self.streams[i] - if err = stream.handleRtpPacket(block[4:]); err != nil { - return + herr := stream.handleRtpPacket(block[4:]) + if herr != nil { + if !self.SkipErrRtpBlock { + err = herr + return + } } if stream.gotpkt { - timeScale := stream.Sdp.TimeScale - /* TODO: sync AV by rtcp NTP timestamp + TODO: handle timestamp overflow https://tools.ietf.org/html/rfc3550 A receiver can then synchronize presentation of the audio and video packets by relating their RTP timestamps using the timestamp pairs in RTCP SR packets. @@ -1056,18 +1091,14 @@ func (self *Client) handleBlock(block []byte) (pkt av.Packet, ok bool, err error } stream.timestamp -= stream.firsttimestamp - if timeScale == 0 { - // https://tools.ietf.org/html/rfc5391 - timeScale = 8000 - } - ok = true pkt = stream.pkt - pkt.Time = time.Duration(stream.timestamp)*time.Second / time.Duration(timeScale) + pkt.Time = time.Duration(stream.timestamp)*time.Second / time.Duration(stream.timeScale) pkt.Idx = int8(self.setupMap[i]) - if pkt.Time < stream.lasttime { - err = fmt.Errorf("rtp: stream#%d time=%v < lasttime=%v", pkt.Time, stream.lasttime) + if pkt.Time < stream.lasttime || pkt.Time - stream.lasttime > time.Minute*30 { + err = fmt.Errorf("rtp: time invalid stream#%d time=%v lasttime=%v", pkt.Idx, pkt.Time, stream.lasttime) + return } stream.lasttime = pkt.Time diff --git a/stream.go b/stream.go index b4d3e31..2462295 100644 --- a/stream.go +++ b/stream.go @@ -11,6 +11,8 @@ type Stream struct { Sdp sdp.Media client *Client + timeScale int + // h264 fuStarted bool fuBuffer []byte