From 12f2f28f55c4722a658482aaa79da9966d568cce Mon Sep 17 00:00:00 2001 From: nareix Date: Sat, 30 Apr 2016 09:44:07 +0800 Subject: [PATCH] change readPacket mechanism and add h264 probing --- demuxer.go | 185 +++++++++++++++++++++++++++++++++++++++-------------- stream.go | 2 +- 2 files changed, 138 insertions(+), 49 deletions(-) diff --git a/demuxer.go b/demuxer.go index a330809..1887e2d 100644 --- a/demuxer.go +++ b/demuxer.go @@ -3,8 +3,10 @@ package ts import ( "bytes" "fmt" + "encoding/hex" "github.com/nareix/av" "github.com/nareix/codec/aacparser" + "github.com/nareix/codec/h264parser" "io" ) @@ -14,6 +16,9 @@ type Demuxer struct { pat PAT pmt *PMT streams []*Stream + time float64 + + readErr error } // ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool) @@ -27,10 +32,8 @@ func (self *Demuxer) Streams() (streams []av.Stream) { } func (self *Demuxer) Time() float64 { - for _, stream := range self.streams { - if len(stream.pkts) > 0 { - return stream.pkts[len(stream.pkts)-1].time - } + if len(self.streams) > 0 { + return self.streams[0].time } return 0.0 } @@ -50,7 +53,6 @@ func (self *Demuxer) ReadHeader() (err error) { break } } - if err = self.readPacket(); err != nil { return } @@ -66,17 +68,39 @@ func (self *Demuxer) ReadPacket() (streamIndex int, pkt av.Packet, err error) { } for { - for i, stream := range self.streams { - if len(stream.pkts) > 1 { - streamIndex = i - pkt = stream.pkts[0].Packet - stream.pkts = stream.pkts[1:] - return + if self.readErr != nil { + if false { + for _, stream := range self.streams { + fmt.Println("read(flush): stream", stream.Type(), "pkts", len(stream.pkts)) + } + } + for i, stream := range self.streams { + var ok bool + if pkt, ok = stream.readLastPacket(); ok { + streamIndex = i + return + } + } + err = self.readErr + return + + } else { + if false { + for _, stream := range self.streams { + fmt.Println("read(normal): stream", stream.Type(), "pkts", len(stream.pkts)) + } + } + for i, stream := range self.streams { + var ok bool + if pkt, ok = stream.readPacket(); ok { + streamIndex = i + return + } } } - if err = self.readPacket(); err != nil { - return + if self.readErr == nil { + self.readErr = self.readPacket() } } } @@ -123,7 +147,7 @@ func (self *Demuxer) readPacket() (err error) { } else { for _, stream := range self.streams { if header.PID == stream.pid { - if err = stream.appendPacket(header, payload); err != nil { + if err = stream.handleTSPacket(header, payload); err != nil { return } } @@ -135,13 +159,78 @@ func (self *Demuxer) readPacket() (err error) { return } -func (self *Stream) appendPayload() (err error) { - self.payload = self.buf.Bytes() +func (self *Stream) readLastPacket() (ret av.Packet, ok bool) { + if len(self.pkts) > 1 { + return self.readPacket() + } + if len(self.pkts) == 1 { + pkt := self.pkts[0] + self.pkts = self.pkts[1:] + if self.peshdr.DataLength == 0 { + pkt.Data = self.buf.Bytes() + } + self.time += pkt.Duration + return pkt.Packet, true + } + return +} - if self.Type() == av.AAC { - if len(self.CodecData()) == 0 { +func (self *Stream) readPacket() (ret av.Packet, ok bool) { + if len(self.pkts) > 1 { + pkt := self.pkts[0] + self.pkts = self.pkts[1:] + self.time += pkt.Duration + return pkt.Packet, true + } + return +} + +func (self *Stream) payloadStart() { + if false { + fmt.Println("payloadStart:", self) + } + + dts := self.peshdr.DTS + pts := self.peshdr.PTS + if dts == 0 { + dts = pts + } + + pkt := tsPacket{ + Packet: av.Packet{ + IsKeyFrame: self.tshdr.RandomAccessIndicator, + }, + time: float64(dts)/float64(PTS_HZ), + } + if pts != dts { + pkt.CompositionTime = float64(pts-dts)/float64(PTS_HZ) + } + + if len(self.pkts) > 0 { + lastpkt := &self.pkts[len(self.pkts)-1] + lastpkt.Duration = pkt.time - lastpkt.time + self.lastDuration = lastpkt.Duration + } else { + pkt.Duration = self.lastDuration + } + + self.pkts = append(self.pkts, pkt) +} + +func (self *Stream) payloadEnd() (err error) { + if false { + fmt.Println("payloadEnd:", self) + } + + payload := self.buf.Bytes() + + curpkt := &self.pkts[len(self.pkts)-1] + curpkt.Data = payload + + if len(self.CodecData()) == 0 { + if self.Type() == av.AAC { var config aacparser.MPEG4AudioConfig - if config, _, _, _, err = aacparser.ReadADTSFrame(self.payload); err != nil { + if config, _, _, _, err = aacparser.ReadADTSFrame(payload); err != nil { err = fmt.Errorf("ReadADTSFrame failed: %s", err) return } @@ -154,42 +243,41 @@ func (self *Stream) appendPayload() (err error) { err = fmt.Errorf("SetCodecData failed: %s", err) return } + } else if self.Type() == av.H264 { + if false { + fmt.Println(hex.Dump(payload)) + } + nalus, _ := h264parser.SplitNALUs(payload) + var sps, pps []byte + for _, nalu := range nalus { + if len(nalu) > 0 { + naltype := nalu[0]&0x1f + if naltype == 7 { + sps = nalu + } else if naltype == 8 { + pps = nalu + } + } + } + if len(sps) > 0 && len(pps) > 0 { + codecData, _ := h264parser.CreateCodecDataBySPSAndPPS(sps, pps) + if err = self.SetCodecData(codecData); err != nil { + err = fmt.Errorf("SetCodecData failed: %s", err) + return + } + } } } - dts := self.peshdr.DTS - pts := self.peshdr.PTS - if dts == 0 { - dts = pts - } - - pkt := tsPacket{ - Packet: av.Packet{ - IsKeyFrame: self.tshdr.RandomAccessIndicator, - Data: self.payload, - }, - time: float64(dts)/float64(PTS_HZ), - } - - if pts != dts { - pkt.CompositionTime = float64(pts-dts)/float64(PTS_HZ) - } - - if len(self.pkts) > 0 { - lastPkt := &self.pkts[len(self.pkts)-1] - lastPkt.Duration = pkt.time - lastPkt.time - } - self.pkts = append(self.pkts, pkt) - return } -func (self *Stream) appendPacket(header TSHeader, payload []byte) (err error) { - r := bytes.NewReader(payload) - lr := &io.LimitedReader{R: r, N: int64(len(payload))} +func (self *Stream) handleTSPacket(header TSHeader, tspacket []byte) (err error) { + r := bytes.NewReader(tspacket) + lr := &io.LimitedReader{R: r, N: int64(len(tspacket))} if header.PayloadUnitStart && self.peshdr != nil && self.peshdr.DataLength == 0 { - if err = self.appendPayload(); err != nil { + if err = self.payloadEnd(); err != nil { return } } @@ -200,6 +288,7 @@ func (self *Stream) appendPacket(header TSHeader, payload []byte) (err error) { return } self.tshdr = header + self.payloadStart() } if _, err = io.CopyN(&self.buf, lr, lr.N); err != nil { @@ -207,7 +296,7 @@ func (self *Stream) appendPacket(header TSHeader, payload []byte) (err error) { } if self.buf.Len() == int(self.peshdr.DataLength) { - if err = self.appendPayload(); err != nil { + if err = self.payloadEnd(); err != nil { return } } diff --git a/stream.go b/stream.go index e7c144c..49df939 100644 --- a/stream.go +++ b/stream.go @@ -14,10 +14,10 @@ type Stream struct { av.StreamCommon time float64 + lastDuration float64 pid uint buf bytes.Buffer - payload []byte peshdr *PESHeader tshdr TSHeader