diff --git a/client.go b/client.go index 00a385a..581ec23 100644 --- a/client.go +++ b/client.go @@ -9,7 +9,7 @@ import ( "encoding/hex" "fmt" "github.com/nareix/av" - "github.com/nareix/av/pktqueue" + "github.com/nareix/av/pktque" "github.com/nareix/codec" "github.com/nareix/codec/aacparser" "github.com/nareix/codec/h264parser" @@ -50,9 +50,10 @@ type Client struct { requestUri string cseq uint streams []*Stream + streamsintf []av.CodecData session string body io.Reader - pktque *pktqueue.Queue + corrector *pktque.TimeCorrector } type Request struct { @@ -108,9 +109,7 @@ func Dial(uri string) (self *Client, err error) { func (self *Client) Streams() (streams []av.CodecData, err error) { if self.setupCalled { - for _, i := range self.setupIdx { - streams = append(streams, self.streams[i].CodecData) - } + streams = self.streamsintf } else { err = fmt.Errorf("rtsp: no streams") return @@ -409,6 +408,7 @@ func (self *Client) Setup(idx []int) (err error) { return } } + self.initstructs() self.setupCalled = true return @@ -419,6 +419,14 @@ func md5hash(s string) string { return hex.EncodeToString(h[:]) } +func (self *Client) initstructs() { + self.streamsintf = make([]av.CodecData, len(self.setupIdx)) + for i := range self.setupIdx { + self.streamsintf[i] = self.streams[self.setupIdx[i]].CodecData + } + self.corrector = pktque.NewTimeCorrector(self.streamsintf) +} + func (self *Client) Describe() (streams []av.CodecData, err error) { var res Response @@ -449,9 +457,9 @@ func (self *Client) Describe() (streams []av.CodecData, err error) { fmt.Println("<", body) } - self.streams = []*Stream{} _, medias := sdp.Parse(body) + self.streams = []*Stream{} for _, media := range medias { stream := &Stream{Sdp: media, client: self} if err = stream.makeCodecData(); err != nil { @@ -459,14 +467,9 @@ func (self *Client) Describe() (streams []av.CodecData, err error) { } self.streams = append(self.streams, stream) } - for _, stream := range self.streams { streams = append(streams, stream) } - self.pktque = &pktqueue.Queue{ - Poll: self.poll, - } - self.pktque.Alloc(streams) return } @@ -506,6 +509,7 @@ func (self *Client) HandleCodecDataChange() (_newcli *Client, err error) { } newcli.streams = append(newcli.streams, newstream) } + newcli.initstructs() _newcli = newcli return @@ -923,7 +927,18 @@ func (self *Client) Close() (err error) { return self.conn.Conn.Close() } -func (self *Client) poll() (err error) { +func (self *Client) ReadPacket() (pkt av.Packet, err error) { + if !self.setupCalled { + if err = self.setupAll(); err != nil { + return + } + } + if !self.playCalled { + if err = self.Play(); err != nil { + return + } + } + for { var res Response if res, err = self.ReadResponse(); err != nil { @@ -946,32 +961,22 @@ func (self *Client) poll() (err error) { timeScale = 16000 } - tm := time.Duration(stream.timestamp)*time.Second / time.Duration(timeScale) + pkt = stream.pkt + pkt.Time = time.Duration(stream.timestamp)*time.Second / time.Duration(timeScale) if false { - fmt.Printf("rtsp: #%d %d/%d %d\n", i, stream.timestamp, timeScale, len(stream.pkt.Data)) + fmt.Printf("rtsp: #%d %d/%d %d\n", i, stream.timestamp, timeScale, len(pkt.Data)) } - self.pktque.WriteTimePacket(self.setupMap[i], tm, stream.pkt) + pkt.Idx = int8(self.setupMap[i]) + self.corrector.Correct(&pkt) + stream.pkt = av.Packet{} stream.gotpkt = false return } } } - return -} -func (self *Client) ReadPacket() (i int, pkt av.Packet, err error) { - if !self.setupCalled { - if err = self.setupAll(); err != nil { - return - } - } - if !self.playCalled { - if err = self.Play(); err != nil { - return - } - } - return self.pktque.ReadPacket() + return } func (self *Client) ReadHeader() (err error) {