From d3ba5273094a541cab4a3476adde079b87715cee Mon Sep 17 00:00:00 2001 From: nareix Date: Wed, 25 May 2016 07:44:59 +0800 Subject: [PATCH] adjust newapi --- demuxer.go | 168 ++++++++++++++++------------------------------------- muxer.go | 32 +++++++--- stream.go | 21 +++---- 3 files changed, 84 insertions(+), 137 deletions(-) diff --git a/demuxer.go b/demuxer.go index 0a6ab8d..af6c029 100644 --- a/demuxer.go +++ b/demuxer.go @@ -5,37 +5,43 @@ import ( "fmt" "encoding/hex" "github.com/nareix/av" + "github.com/nareix/av/pktqueue" "github.com/nareix/codec/aacparser" "github.com/nareix/codec/h264parser" "io" ) +func Open(R io.Reader) (demuxer *Demuxer, err error) { + _demuxer := &Demuxer{R: R} + if err = _demuxer.ReadHeader(); err != nil { + return + } + demuxer = _demuxer + return +} + type Demuxer struct { R io.Reader + gotpkt bool + pktque *pktqueue.Queue pat PAT pmt *PMT streams []*Stream - time float64 - - readErr error } -// ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool) -// WritePayload(pid, pts, dts, isKeyFrame, payloads, isVideoFrame) - -func (self *Demuxer) Streams() (streams []av.Stream) { +func (self *Demuxer) Streams() (streams []av.CodecData) { for _, stream := range self.streams { streams = append(streams, stream) } return } -func (self *Demuxer) Time() float64 { - if len(self.streams) > 0 { - return self.streams[0].time +func (self *Demuxer) CurrentTime() (time float64) { + if self.pktque != nil { + time = self.pktque.CurrentTime() } - return 0.0 + return } func (self *Demuxer) ReadHeader() (err error) { @@ -45,7 +51,7 @@ func (self *Demuxer) ReadHeader() (err error) { if self.pmt != nil { n := 0 for _, stream := range self.streams { - if len(stream.CodecData()) > 0 { + if stream.CodecData != nil { n++ } } @@ -53,7 +59,7 @@ func (self *Demuxer) ReadHeader() (err error) { break } } - if err = self.readPacket(); err != nil { + if err = self.poll(); err != nil { return } } @@ -61,51 +67,21 @@ func (self *Demuxer) ReadHeader() (err error) { return } -func (self *Demuxer) ReadPacket() (streamIndex int, pkt av.Packet, err error) { - if len(self.streams) == 0 { - err = fmt.Errorf("no stream") - return - } - - for { - if self.readErr != nil { - if false { - for _, stream := range self.streams { - fmt.Println("read(flush): stream", stream.Type(), "pkts", len(stream.pkts)) - } - } - for i, stream := range self.streams { - var ok bool - if pkt, ok = stream.readLastPacket(); ok { - streamIndex = i - return - } - } - err = self.readErr - return - - } else { - if false { - for _, stream := range self.streams { - fmt.Println("read(normal): stream", stream.Type(), "pkts", len(stream.pkts)) - } - } - for i, stream := range self.streams { - var ok bool - if pkt, ok = stream.readPacket(); ok { - streamIndex = i - return - } - } - } - - if self.readErr == nil { - self.readErr = self.readPacket() - } - } +func (self *Demuxer) ReadPacket() (i int, pkt av.Packet, err error) { + return self.pktque.ReadPacket() } -func (self *Demuxer) readPacket() (err error) { +func (self *Demuxer) poll() (err error) { + for self.gotpkt { + if err = self.readTSPacket(); err != nil { + return + } + } + self.gotpkt = false + return +} + +func (self *Demuxer) readTSPacket() (err error) { var header TSHeader var n int var data [188]byte @@ -127,20 +103,21 @@ func (self *Demuxer) readPacket() (err error) { if *self.pmt, err = ReadPMT(bytes.NewReader(payload)); err != nil { return } - for _, info := range self.pmt.ElementaryStreamInfos { + for i, info := range self.pmt.ElementaryStreamInfos { stream := &Stream{} - + stream.idx = i stream.demuxer = self stream.pid = info.ElementaryPID switch info.StreamType { case ElementaryStreamTypeH264: - stream.SetType(av.H264) self.streams = append(self.streams, stream) case ElementaryStreamTypeAdtsAAC: - stream.SetType(av.AAC) self.streams = append(self.streams, stream) } } + self.pktque = &pktqueue.Queue{} + self.pktque.Alloc(len(self.streams)) + self.pktque.Poll = self.poll } } @@ -159,68 +136,28 @@ func (self *Demuxer) readPacket() (err error) { return } -func (self *Stream) readLastPacket() (ret av.Packet, ok bool) { - if len(self.pkts) > 1 { - return self.readPacket() - } - if len(self.pkts) == 1 { - pkt := self.pkts[0] - self.pkts = self.pkts[1:] - if self.peshdr.DataLength == 0 { - pkt.Data = self.buf.Bytes() - } - self.time += pkt.Duration - return pkt.Packet, true - } - return -} +func (self *Stream) payloadEnd() (err error) { + payload := self.buf.Bytes() -func (self *Stream) readPacket() (ret av.Packet, ok bool) { - if len(self.pkts) > 1 { - pkt := self.pkts[0] - self.pkts = self.pkts[1:] - self.time += pkt.Duration - return pkt.Packet, true - } - return -} - -func (self *Stream) payloadStart() { dts := self.peshdr.DTS pts := self.peshdr.PTS if dts == 0 { dts = pts } - pkt := tsPacket{ - Packet: av.Packet{ - IsKeyFrame: self.tshdr.RandomAccessIndicator, - }, - time: float64(dts)/float64(PTS_HZ), + pkt := av.Packet{ + IsKeyFrame: self.tshdr.RandomAccessIndicator, + Data: payload, } + time := float64(dts)/float64(PTS_HZ) if pts != dts { pkt.CompositionTime = float64(pts-dts)/float64(PTS_HZ) } + self.demuxer.pktque.WriteTimePacket(self.idx, time, pkt) + self.demuxer.gotpkt = true - if len(self.pkts) > 0 { - lastpkt := &self.pkts[len(self.pkts)-1] - lastpkt.Duration = pkt.time - lastpkt.time - self.lastDuration = lastpkt.Duration - } else { - pkt.Duration = self.lastDuration - } - - self.pkts = append(self.pkts, pkt) -} - -func (self *Stream) payloadEnd() (err error) { - payload := self.buf.Bytes() - - curpkt := &self.pkts[len(self.pkts)-1] - curpkt.Data = payload - - if len(self.CodecData()) == 0 { - if self.Type() == av.AAC { + if self.CodecData == nil { + if self.streamType == ElementaryStreamTypeAdtsAAC { var config aacparser.MPEG4AudioConfig if config, _, _, _, err = aacparser.ReadADTSFrame(payload); err != nil { err = fmt.Errorf("ReadADTSFrame failed: %s", err) @@ -231,11 +168,10 @@ func (self *Stream) payloadEnd() (err error) { err = fmt.Errorf("WriteMPEG4AudioConfig failed: %s", err) return } - if err = self.SetCodecData(bw.Bytes()); err != nil { - err = fmt.Errorf("SetCodecData failed: %s", err) + if self.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(bw.Bytes()); err != nil { return } - } else if self.Type() == av.H264 { + } else if self.streamType == ElementaryStreamTypeH264 { if false { fmt.Println(hex.Dump(payload)) } @@ -252,9 +188,7 @@ func (self *Stream) payloadEnd() (err error) { } } if len(sps) > 0 && len(pps) > 0 { - codecData, _ := h264parser.CreateCodecDataBySPSAndPPS(sps, pps) - if err = self.SetCodecData(codecData); err != nil { - err = fmt.Errorf("SetCodecData failed: %s", err) + if self.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil { return } } @@ -280,7 +214,6 @@ func (self *Stream) handleTSPacket(header TSHeader, tspacket []byte) (err error) return } self.tshdr = header - self.payloadStart() } if _, err = io.CopyN(&self.buf, lr, lr.N); err != nil { @@ -295,3 +228,4 @@ func (self *Stream) handleTSPacket(header TSHeader, tspacket []byte) (err error) return } + diff --git a/muxer.go b/muxer.go index 4a36ba0..975a545 100644 --- a/muxer.go +++ b/muxer.go @@ -9,6 +9,15 @@ import ( "io" ) +func Create(W io.Writer, streams []av.CodecData) (muxer *Muxer, err error) { + _muxer := &Muxer{W: W} + if err = _muxer.WriteHeader(streams); err != nil { + return + } + muxer = _muxer + return +} + type Muxer struct { W io.Writer streams []*Stream @@ -18,16 +27,17 @@ type Muxer struct { tswPMT *TSWriter } -func (self *Muxer) NewStream() av.Stream { +func (self *Muxer) NewStream(codec av.CodecData) (err error) { stream := &Stream{ - mux: self, + muxer: self, + CodecData: codec, tsw: &TSWriter{ DiscontinuityIndicator: true, PID: uint(len(self.streams) + 0x100), }, } self.streams = append(self.streams, stream) - return stream + return } func (self *Muxer) writePaddingTSPackets(tsw *TSWriter) (err error) { @@ -55,7 +65,13 @@ func (self *Muxer) WriteTrailer() (err error) { return } -func (self *Muxer) WriteHeader() (err error) { +func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) { + for _, stream := range streams { + if err = self.NewStream(stream); err != nil { + return + } + } + bufPAT := &bytes.Buffer{} bufPMT := &bytes.Buffer{} @@ -104,9 +120,10 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) { stream := self.streams[streamIndex] if stream.Type() == av.AAC { + codec := stream.CodecData.(av.AACCodecData) data := pkt.Data if !aacparser.IsADTSFrame(data) { - data = append(aacparser.MakeADTSHeader(stream.AACCodecInfo.MPEG4AudioConfig, 1024, len(data)), data...) + data = append(codec.MakeADTSHeader(1024, len(data)), data...) } buf := &bytes.Buffer{} @@ -126,6 +143,7 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) { stream.time += pkt.Duration } else if stream.Type() == av.H264 { + codec := stream.CodecData.(av.H264CodecData) buf := &bytes.Buffer{} pes := PESHeader{ StreamId: StreamIdH264, @@ -136,9 +154,7 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) { nalus, _ := h264parser.SplitNALUs(pkt.Data) if pkt.IsKeyFrame { - sps := stream.H264CodecInfo.Record.SPS[0] - pps := stream.H264CodecInfo.Record.PPS[0] - nalus = append([][]byte{sps, pps}, nalus...) + nalus = append([][]byte{codec.SPS(), codec.PPS()}, nalus...) } h264parser.WalkNALUsAnnexb(nalus, func(b []byte) { buf.Write(b) diff --git a/stream.go b/stream.go index 49df939..a08f0c8 100644 --- a/stream.go +++ b/stream.go @@ -5,30 +5,27 @@ import ( "github.com/nareix/av" ) -type tsPacket struct { - av.Packet - time float64 -} - type Stream struct { - av.StreamCommon - - time float64 - lastDuration float64 + av.CodecData pid uint buf bytes.Buffer peshdr *PESHeader tshdr TSHeader - pkts []tsPacket - demuxer *Demuxer - mux *Muxer + muxer *Muxer + streamId uint + streamType uint + tsw *TSWriter dataBuf *iovec cacheSize int + + idx int + pkt av.Packet + time float64 } func timeToPesTs(time float64) uint64 {