From 50b272ba764bb55b266ed88c0fb69b36cc89f065 Mon Sep 17 00:00:00 2001 From: nareix Date: Wed, 22 Jun 2016 17:59:12 +0800 Subject: [PATCH] change to new av.Packet --- demuxer.go | 48 +++++++++++++++++++++--------------------------- muxer.go | 38 ++++++++++++++++++++------------------ stream.go | 1 - 3 files changed, 41 insertions(+), 46 deletions(-) diff --git a/demuxer.go b/demuxer.go index dbcdefa..4eda814 100644 --- a/demuxer.go +++ b/demuxer.go @@ -6,7 +6,6 @@ import ( "fmt" "time" "github.com/nareix/av" - "github.com/nareix/av/pktqueue" "github.com/nareix/codec/aacparser" "github.com/nareix/codec/h264parser" "io" @@ -25,10 +24,12 @@ type Demuxer struct { R io.Reader gotpkt bool - pktque *pktqueue.Queue + pkt av.Packet + pat PAT pmt *PMT streams []*Stream + streamsintf []av.CodecData } func (self *Demuxer) Streams() (streams []av.CodecData, err error) { @@ -37,21 +38,12 @@ func (self *Demuxer) Streams() (streams []av.CodecData, err error) { return } for _, stream := range self.streams { - streams = append(streams, stream) - } - return -} - -func (self *Demuxer) CurrentTime() (tm time.Duration) { - if self.pktque != nil { - tm = self.pktque.CurrentTime() + streams = append(streams, stream.CodecData) } return } func (self *Demuxer) ReadHeader() (err error) { - self.streams = []*Stream{} - for { if self.pmt != nil { n := 0 @@ -68,12 +60,15 @@ func (self *Demuxer) ReadHeader() (err error) { return } } - return } -func (self *Demuxer) ReadPacket() (i int, pkt av.Packet, err error) { - return self.pktque.ReadPacket() +func (self *Demuxer) ReadPacket() (pkt av.Packet, err error) { + if err = self.poll(); err != nil { + return + } + pkt = self.pkt + return } func (self *Demuxer) poll() (err error) { @@ -102,6 +97,8 @@ func (self *Demuxer) readTSPacket() (err error) { } } else { if self.pmt == nil { + self.streams = []*Stream{} + for _, entry := range self.pat.Entries { if entry.ProgramMapPID == header.PID { self.pmt = new(PMT) @@ -120,17 +117,14 @@ func (self *Demuxer) readTSPacket() (err error) { self.streams = append(self.streams, stream) } } - self.pktque = &pktqueue.Queue{} - - var streams []av.CodecData - if streams, err = self.Streams(); err != nil { - return - } - self.pktque.Alloc(streams) - self.pktque.Poll = self.poll } } + self.streamsintf = make([]av.CodecData, len(self.streams)) + for i, stream := range self.streams { + self.streamsintf[i] = stream + } + } else { for _, stream := range self.streams { if header.PID == stream.pid { @@ -155,15 +149,15 @@ func (self *Stream) payloadEnd() (err error) { dts = pts } - pkt := av.Packet{ + self.pkt = av.Packet{ + Idx: int8(self.idx), IsKeyFrame: self.tshdr.RandomAccessIndicator, + Time: time.Duration(dts)*time.Second / time.Duration(PTS_HZ), Data: payload, } - tm := time.Duration(dts)*time.Second / time.Duration(PTS_HZ) if pts != dts { - pkt.CompositionTime = time.Duration(pts-dts)*time.Second / time.Duration(PTS_HZ) + self.pkt.CompositionTime = time.Duration(pts-dts)*time.Second / time.Duration(PTS_HZ) } - self.demuxer.pktque.WriteTimePacket(self.idx, tm, pkt) self.demuxer.gotpkt = true if self.CodecData == nil { diff --git a/muxer.go b/muxer.go index 4b0e472..7ab44e4 100644 --- a/muxer.go +++ b/muxer.go @@ -135,14 +135,24 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) { if err = self.WritePATPMT(); err != nil { return } - return } -func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) { - stream := self.streams[streamIndex] +func (self *Muxer) WritePacket(pkt av.Packet) (err error) { + if true { + fmt.Println("ts:", "in", pkt.Idx, pkt.Time, "len", len(pkt.Data)) + } + if err = self.writePacket(pkt); err != nil { + return + } + return +} - if stream.Type() == av.AAC { +func (self *Muxer) writePacket(pkt av.Packet) (err error) { + stream := self.streams[pkt.Idx] + + switch stream.Type() { + case av.AAC: codec := stream.CodecData.(aacparser.CodecData) data := pkt.Data if !aacparser.IsADTSFrame(data) { @@ -152,26 +162,24 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) { buf := &bytes.Buffer{} pes := PESHeader{ StreamId: StreamIdAAC, - PTS: timeToPesTs(stream.time), + PTS: timeToPesTs(pkt.Time), } WritePESHeader(buf, pes, len(data)) buf.Write(data) stream.tsw.RandomAccessIndicator = true - stream.tsw.PCR = timeToPCR(stream.time) + stream.tsw.PCR = timeToPCR(pkt.Time) if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil { return } - stream.time += pkt.Duration - - } else if stream.Type() == av.H264 { + case av.H264: codec := stream.CodecData.(h264parser.CodecData) buf := &bytes.Buffer{} pes := PESHeader{ StreamId: StreamIdH264, - PTS: timeToPesTs(stream.time + pkt.CompositionTime), - DTS: timeToPesTs(stream.time), + PTS: timeToPesTs(pkt.Time + pkt.CompositionTime), + DTS: timeToPesTs(pkt.Time), } WritePESHeader(buf, pes, 0) @@ -184,16 +192,10 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) { }) stream.tsw.RandomAccessIndicator = pkt.IsKeyFrame - stream.tsw.PCR = timeToPCR(stream.time) + stream.tsw.PCR = timeToPCR(pkt.Time) if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil { return } - - stream.time += pkt.Duration - - } else { - err = fmt.Errorf("unknown stream type=%d", stream.Type()) - return } return diff --git a/stream.go b/stream.go index 6410aea..2c665fb 100644 --- a/stream.go +++ b/stream.go @@ -26,7 +26,6 @@ type Stream struct { idx int pkt av.Packet - time time.Duration } func timeToPesTs(tm time.Duration) uint64 {