use new time corrector

This commit is contained in:
nareix 2016-06-21 00:45:26 +08:00
parent 9d464e5c67
commit ded940976a

View File

@ -9,7 +9,7 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"github.com/nareix/av" "github.com/nareix/av"
"github.com/nareix/av/pktqueue" "github.com/nareix/av/pktque"
"github.com/nareix/codec" "github.com/nareix/codec"
"github.com/nareix/codec/aacparser" "github.com/nareix/codec/aacparser"
"github.com/nareix/codec/h264parser" "github.com/nareix/codec/h264parser"
@ -50,9 +50,10 @@ type Client struct {
requestUri string requestUri string
cseq uint cseq uint
streams []*Stream streams []*Stream
streamsintf []av.CodecData
session string session string
body io.Reader body io.Reader
pktque *pktqueue.Queue corrector *pktque.TimeCorrector
} }
type Request struct { type Request struct {
@ -108,9 +109,7 @@ func Dial(uri string) (self *Client, err error) {
func (self *Client) Streams() (streams []av.CodecData, err error) { func (self *Client) Streams() (streams []av.CodecData, err error) {
if self.setupCalled { if self.setupCalled {
for _, i := range self.setupIdx { streams = self.streamsintf
streams = append(streams, self.streams[i].CodecData)
}
} else { } else {
err = fmt.Errorf("rtsp: no streams") err = fmt.Errorf("rtsp: no streams")
return return
@ -409,6 +408,7 @@ func (self *Client) Setup(idx []int) (err error) {
return return
} }
} }
self.initstructs()
self.setupCalled = true self.setupCalled = true
return return
@ -419,6 +419,14 @@ func md5hash(s string) string {
return hex.EncodeToString(h[:]) return hex.EncodeToString(h[:])
} }
func (self *Client) initstructs() {
self.streamsintf = make([]av.CodecData, len(self.setupIdx))
for i := range self.setupIdx {
self.streamsintf[i] = self.streams[self.setupIdx[i]].CodecData
}
self.corrector = pktque.NewTimeCorrector(self.streamsintf)
}
func (self *Client) Describe() (streams []av.CodecData, err error) { func (self *Client) Describe() (streams []av.CodecData, err error) {
var res Response var res Response
@ -449,9 +457,9 @@ func (self *Client) Describe() (streams []av.CodecData, err error) {
fmt.Println("<", body) fmt.Println("<", body)
} }
self.streams = []*Stream{}
_, medias := sdp.Parse(body) _, medias := sdp.Parse(body)
self.streams = []*Stream{}
for _, media := range medias { for _, media := range medias {
stream := &Stream{Sdp: media, client: self} stream := &Stream{Sdp: media, client: self}
if err = stream.makeCodecData(); err != nil { if err = stream.makeCodecData(); err != nil {
@ -459,14 +467,9 @@ func (self *Client) Describe() (streams []av.CodecData, err error) {
} }
self.streams = append(self.streams, stream) self.streams = append(self.streams, stream)
} }
for _, stream := range self.streams { for _, stream := range self.streams {
streams = append(streams, stream) streams = append(streams, stream)
} }
self.pktque = &pktqueue.Queue{
Poll: self.poll,
}
self.pktque.Alloc(streams)
return return
} }
@ -506,6 +509,7 @@ func (self *Client) HandleCodecDataChange() (_newcli *Client, err error) {
} }
newcli.streams = append(newcli.streams, newstream) newcli.streams = append(newcli.streams, newstream)
} }
newcli.initstructs()
_newcli = newcli _newcli = newcli
return return
@ -923,7 +927,18 @@ func (self *Client) Close() (err error) {
return self.conn.Conn.Close() return self.conn.Conn.Close()
} }
func (self *Client) poll() (err error) { func (self *Client) ReadPacket() (pkt av.Packet, err error) {
if !self.setupCalled {
if err = self.setupAll(); err != nil {
return
}
}
if !self.playCalled {
if err = self.Play(); err != nil {
return
}
}
for { for {
var res Response var res Response
if res, err = self.ReadResponse(); err != nil { if res, err = self.ReadResponse(); err != nil {
@ -946,33 +961,23 @@ func (self *Client) poll() (err error) {
timeScale = 16000 timeScale = 16000
} }
tm := time.Duration(stream.timestamp)*time.Second / time.Duration(timeScale) pkt = stream.pkt
pkt.Time = time.Duration(stream.timestamp)*time.Second / time.Duration(timeScale)
if false { if false {
fmt.Printf("rtsp: #%d %d/%d %d\n", i, stream.timestamp, timeScale, len(stream.pkt.Data)) fmt.Printf("rtsp: #%d %d/%d %d\n", i, stream.timestamp, timeScale, len(pkt.Data))
} }
self.pktque.WriteTimePacket(self.setupMap[i], tm, stream.pkt) pkt.Idx = int8(self.setupMap[i])
self.corrector.Correct(&pkt)
stream.pkt = av.Packet{} stream.pkt = av.Packet{}
stream.gotpkt = false stream.gotpkt = false
return return
} }
} }
} }
return
}
func (self *Client) ReadPacket() (i int, pkt av.Packet, err error) {
if !self.setupCalled {
if err = self.setupAll(); err != nil {
return return
} }
}
if !self.playCalled {
if err = self.Play(); err != nil {
return
}
}
return self.pktque.ReadPacket()
}
func (self *Client) ReadHeader() (err error) { func (self *Client) ReadHeader() (err error) {
if err = self.Options(); err != nil { if err = self.Options(); err != nil {