add SkipErrRtpBlock, improve parseBlockHeader

This commit is contained in:
nareix 2016-06-23 08:20:04 +08:00
parent a2be4cf878
commit c7d51e38e6
2 changed files with 56 additions and 23 deletions

View File

@ -30,6 +30,8 @@ type Client struct {
DebugRtp bool DebugRtp bool
Headers []string Headers []string
SkipErrRtpBlock bool
RtspTimeout time.Duration RtspTimeout time.Duration
RtpTimeout time.Duration RtpTimeout time.Duration
RtpKeepAliveTimeout time.Duration RtpKeepAliveTimeout time.Duration
@ -179,13 +181,34 @@ func (self *Client) parseBlockHeader(h []byte) (length int, no int, valid bool)
if no/2 >= len(self.streams) { if no/2 >= len(self.streams) {
return return
} }
if length < 8 {
return if no%2 == 0 { // rtp
} if length < 8 {
stream := self.streams[no/2] return
if int(h[5]&0x7f) != stream.Sdp.PayloadType { }
return
// V=2
if h[4]&0xc0 != 0x80 {
return
}
stream := self.streams[no/2]
if int(h[5]&0x7f) != stream.Sdp.PayloadType {
return
}
timestamp := binary.BigEndian.Uint32(h[8:12])
if stream.firsttimestamp != 0 {
timestamp -= stream.firsttimestamp
if timestamp < stream.timestamp {
return
} else if timestamp - stream.timestamp > uint32(stream.timeScale*60*60) {
return
}
}
} else { // rtcp
} }
valid = true valid = true
return return
} }
@ -205,9 +228,7 @@ func (self *Client) parseHeaders(b []byte) (statusCode int, headers textproto.MI
} }
} }
if headers, _ = r.ReadMIMEHeader(); err != nil { headers, _ = r.ReadMIMEHeader()
return
}
return return
} }
@ -298,6 +319,7 @@ func (self *Client) findRTSP() (block []byte, data []byte, err error) {
for { for {
var b byte var b byte
if b, err = self.brconn.ReadByte(); err != nil { if b, err = self.brconn.ReadByte(); err != nil {
err = fmt.Errorf("rtsp: find rtsp header failed: %v", err)
return return
} }
switch b { switch b {
@ -335,11 +357,12 @@ func (self *Client) findRTSP() (block []byte, data []byte, err error) {
return return
} }
if stat == Dollar && len(peek) >= 8 { if stat == Dollar && len(peek) >= 12 {
if blocklen, _, ok := self.parseBlockHeader(peek); ok { if blocklen, _, ok := self.parseBlockHeader(peek); ok {
left := blocklen+4-len(peek) left := blocklen+4-len(peek)
block = append(peek, make([]byte, left)...) block = append(peek, make([]byte, left)...)
if _, err = io.ReadFull(self.brconn, block[len(peek):]); err != nil { if _, err = io.ReadFull(self.brconn, block[len(peek):]); err != nil {
err = fmt.Errorf("rtsp: read block failed: %v", err)
return return
} }
return return
@ -366,6 +389,7 @@ func (self *Client) readLFLF() (block []byte, data []byte, err error) {
for { for {
var b byte var b byte
if b, err = self.brconn.ReadByte(); err != nil { if b, err = self.brconn.ReadByte(); err != nil {
err = fmt.Errorf("rtsp: read LFLF failed: %v", err)
return return
} }
switch b { switch b {
@ -388,12 +412,13 @@ func (self *Client) readLFLF() (block []byte, data []byte, err error) {
if stat == LFLF { if stat == LFLF {
data = peek data = peek
return return
} else if dollarpos != -1 && dollarpos - pos >= 8 { } else if dollarpos != -1 && dollarpos - pos >= 12 {
hdrlen := dollarpos-pos hdrlen := dollarpos-pos
start := len(peek)-hdrlen start := len(peek)-hdrlen
if blocklen, _, ok := self.parseBlockHeader(peek[start:]); ok { if blocklen, _, ok := self.parseBlockHeader(peek[start:]); ok {
block = append(peek[start:], make([]byte, blocklen+4-hdrlen)...) block = append(peek[start:], make([]byte, blocklen+4-hdrlen)...)
if _, err = io.ReadFull(self.brconn, block[hdrlen:]); err != nil { if _, err = io.ReadFull(self.brconn, block[hdrlen:]); err != nil {
err = fmt.Errorf("rtsp: read block failed: %v", err)
return return
} }
return return
@ -415,6 +440,7 @@ func (self *Client) readResp(b []byte) (res Response, err error) {
if res.ContentLength > 0 { if res.ContentLength > 0 {
res.Body = make([]byte, res.ContentLength) res.Body = make([]byte, res.ContentLength)
if _, err = io.ReadFull(self.brconn, res.Body); err != nil { if _, err = io.ReadFull(self.brconn, res.Body); err != nil {
err = fmt.Errorf("rtsp: read body failed: %v", err)
return return
} }
} }
@ -688,6 +714,12 @@ func (self *Stream) makeCodecData() (err error) {
} }
} }
self.timeScale = media.TimeScale
if self.timeScale == 0 {
// https://tools.ietf.org/html/rfc5391
self.timeScale = 8000
}
return return
} }
@ -1038,15 +1070,18 @@ func (self *Client) handleBlock(block []byte) (pkt av.Packet, ok bool, err error
} }
stream := self.streams[i] stream := self.streams[i]
if err = stream.handleRtpPacket(block[4:]); err != nil { herr := stream.handleRtpPacket(block[4:])
return if herr != nil {
if !self.SkipErrRtpBlock {
err = herr
return
}
} }
if stream.gotpkt { if stream.gotpkt {
timeScale := stream.Sdp.TimeScale
/* /*
TODO: sync AV by rtcp NTP timestamp TODO: sync AV by rtcp NTP timestamp
TODO: handle timestamp overflow
https://tools.ietf.org/html/rfc3550 https://tools.ietf.org/html/rfc3550
A receiver can then synchronize presentation of the audio and video packets by relating A receiver can then synchronize presentation of the audio and video packets by relating
their RTP timestamps using the timestamp pairs in RTCP SR packets. their RTP timestamps using the timestamp pairs in RTCP SR packets.
@ -1056,18 +1091,14 @@ func (self *Client) handleBlock(block []byte) (pkt av.Packet, ok bool, err error
} }
stream.timestamp -= stream.firsttimestamp stream.timestamp -= stream.firsttimestamp
if timeScale == 0 {
// https://tools.ietf.org/html/rfc5391
timeScale = 8000
}
ok = true ok = true
pkt = stream.pkt pkt = stream.pkt
pkt.Time = time.Duration(stream.timestamp)*time.Second / time.Duration(timeScale) pkt.Time = time.Duration(stream.timestamp)*time.Second / time.Duration(stream.timeScale)
pkt.Idx = int8(self.setupMap[i]) pkt.Idx = int8(self.setupMap[i])
if pkt.Time < stream.lasttime { if pkt.Time < stream.lasttime || pkt.Time - stream.lasttime > time.Minute*30 {
err = fmt.Errorf("rtp: stream#%d time=%v < lasttime=%v", pkt.Time, stream.lasttime) err = fmt.Errorf("rtp: time invalid stream#%d time=%v lasttime=%v", pkt.Idx, pkt.Time, stream.lasttime)
return
} }
stream.lasttime = pkt.Time stream.lasttime = pkt.Time

View File

@ -11,6 +11,8 @@ type Stream struct {
Sdp sdp.Media Sdp sdp.Media
client *Client client *Client
timeScale int
// h264 // h264
fuStarted bool fuStarted bool
fuBuffer []byte fuBuffer []byte