diff --git a/demuxer.go b/demuxer.go index 69f47c2..ecf0e4e 100644 --- a/demuxer.go +++ b/demuxer.go @@ -3,6 +3,7 @@ package ts import ( "bytes" "fmt" + "github.com/nareix/av" "github.com/nareix/codec/aacparser" "io" ) @@ -10,34 +11,26 @@ import ( type Demuxer struct { R io.Reader - pat PAT - pmt *PMT - Tracks []*Stream - TrackH264 *Stream - TrackAAC *Stream + pat PAT + pmt *PMT + streams []*Stream } // ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool) // WritePayload(pid, pts, dts, isKeyFrame, payloads, isVideoFrame) -func (self *Demuxer) TimeScale() int64 { - return PTS_HZ -} - func (self *Demuxer) ReadHeader() (err error) { - self.Tracks = []*Stream{} - self.TrackH264 = nil - self.TrackAAC = nil + self.streams = []*Stream{} for { if self.pmt != nil { n := 0 - for _, stream := range self.Tracks { - if stream.payloadReady { + for _, stream := range self.streams { + if len(stream.pkts) > 0 { n++ } } - if n == len(self.Tracks) { + if n == len(self.streams) { break } } @@ -50,16 +43,18 @@ func (self *Demuxer) ReadHeader() (err error) { return } -func (self *Demuxer) ReadSample() (stream *Stream, err error) { - if len(self.Tracks) == 0 { - err = fmt.Errorf("no track") +func (self *Demuxer) ReadPacket() (streamIndex int, pkt av.Packet, err error) { + if len(self.streams) == 0 { + err = fmt.Errorf("no stream") return } for { - for _, _track := range self.Tracks { - if _track.payloadReady { - stream = _track + for i, stream := range self.streams { + if len(stream.pkts) > 1 { + streamIndex = i + pkt = stream.pkts[0].Packet + stream.pkts = stream.pkts[1:] return } } @@ -99,72 +94,77 @@ func (self *Demuxer) readPacket() (err error) { stream.pid = info.ElementaryPID switch info.StreamType { case ElementaryStreamTypeH264: - stream.Type = H264 - self.TrackH264 = stream - self.Tracks = append(self.Tracks, stream) + stream.SetType(av.H264) + self.streams = append(self.streams, stream) case ElementaryStreamTypeAdtsAAC: - stream.Type = AAC - self.TrackAAC = stream - self.Tracks = append(self.Tracks, stream) + stream.SetType(av.AAC) + self.streams = append(self.streams, stream) } } } } - } else { - for _, stream := range self.Tracks { + } else { + for _, stream := range self.streams { if header.PID == stream.pid { if err = stream.appendPacket(header, payload); err != nil { return } } } + } } return } -func (self *Stream) GetMPEG4AudioConfig() aacparser.MPEG4AudioConfig { - return self.mpeg4AudioConfig -} - -func (self *Stream) ReadSample() (pts int64, dts int64, isKeyFrame bool, data []byte, err error) { - for !self.payloadReady { - if err = self.demuxer.readPacket(); err != nil { - return - } - } - - dts = int64(self.peshdr.DTS) - pts = int64(self.peshdr.PTS) - if dts == 0 { - dts = pts - } - isKeyFrame = self.tshdr.RandomAccessIndicator - data = self.payload - self.payloadReady = false - - return -} - func (self *Stream) appendPayload() (err error) { self.payload = self.buf.Bytes() - if self.Type == AAC { - if !self.mpeg4AudioConfig.IsValid() { - if self.mpeg4AudioConfig, _, _, _, err = aacparser.ReadADTSFrame(self.payload); err != nil { + if self.Type() == av.AAC { + if len(self.CodecData()) == 0 { + var config aacparser.MPEG4AudioConfig + if config, _, _, _, err = aacparser.ReadADTSFrame(self.payload); err != nil { + err = fmt.Errorf("ReadADTSFrame failed: %s", err) return } - self.mpeg4AudioConfig = self.mpeg4AudioConfig.Complete() - if !self.mpeg4AudioConfig.IsValid() { - err = fmt.Errorf("invalid MPEG4AudioConfig") + bw := &bytes.Buffer{} + if err = aacparser.WriteMPEG4AudioConfig(bw, config); err != nil { + err = fmt.Errorf("WriteMPEG4AudioConfig failed: %s", err) + return + } + if err = self.SetCodecData(bw.Bytes()); err != nil { + err = fmt.Errorf("SetCodecData failed: %s", err) return } } } - self.payloadReady = true + dts := self.peshdr.DTS + pts := self.peshdr.PTS + if dts == 0 { + dts = pts + } + + pkt := tsPacket{ + Packet: av.Packet{ + IsKeyFrame: self.tshdr.RandomAccessIndicator, + Data: self.payload, + }, + time: float64(dts)/float64(PTS_HZ), + } + + if pts != dts { + pkt.Duration = float64(pts-dts)/float64(PTS_HZ) + } + + if len(self.pkts) > 0 { + lastPkt := &self.pkts[len(self.pkts)-1] + lastPkt.Duration = pkt.time - lastPkt.time + } + self.pkts = append(self.pkts, pkt) + return } @@ -179,7 +179,6 @@ func (self *Stream) appendPacket(header TSHeader, payload []byte) (err error) { } if header.PayloadUnitStart { - self.payloadReady = false self.buf = bytes.Buffer{} if self.peshdr, err = ReadPESHeader(lr); err != nil { return diff --git a/muxer.go b/muxer.go index b26e210..9bc10d8 100644 --- a/muxer.go +++ b/muxer.go @@ -3,55 +3,27 @@ package ts import ( "bytes" "fmt" + "github.com/nareix/av" "github.com/nareix/codec/aacparser" "github.com/nareix/codec/h264parser" "io" ) type Muxer struct { - W io.Writer - tswPAT *TSWriter - tswPMT *TSWriter - elemStreams []ElementaryStreamInfo - TrackH264 *Stream - Tracks []*Stream + W io.Writer + streams []*Stream } -func (self *Muxer) newTrack(pid uint, streamId uint) (stream *Stream) { - stream = &Stream{ +func (self *Muxer) NewStream() av.Stream { + stream := &Stream{ mux: self, tsw: &TSWriter{ - PID: pid, DiscontinuityIndicator: true, + PID: uint(len(self.streams) + 0x100), }, - streamId: streamId, } - stream.tsw.EnableVecWriter() - return -} - -func (self *Muxer) AddAACTrack() (stream *Stream) { - self.elemStreams = append( - self.elemStreams, - ElementaryStreamInfo{StreamType: ElementaryStreamTypeAdtsAAC, ElementaryPID: 0x101}, - ) - stream = self.newTrack(0x101, StreamIdAAC) - stream.Type = AAC - stream.cacheSize = 3000 - self.Tracks = append(self.Tracks, stream) - return -} - -func (self *Muxer) AddH264Track() (stream *Stream) { - self.elemStreams = append( - self.elemStreams, - ElementaryStreamInfo{StreamType: ElementaryStreamTypeH264, ElementaryPID: 0x100}, - ) - stream = self.newTrack(0x100, StreamIdH264) - stream.Type = H264 - self.TrackH264 = stream - self.Tracks = append(self.Tracks, stream) - return + self.streams = append(self.streams, stream) + return stream } func (self *Muxer) WriteHeader() (err error) { @@ -64,9 +36,20 @@ func (self *Muxer) WriteHeader() (err error) { }, } WritePAT(bufPAT, pat) + + var elemStreams []ElementaryStreamInfo + for _, stream := range self.streams { + switch stream.Type() { + case av.AAC: + elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeAdtsAAC, ElementaryPID: stream.tsw.PID}) + case av.H264: + elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeH264, ElementaryPID: stream.tsw.PID}) + } + } + pmt := PMT{ PCRPID: 0x100, - ElementaryStreamInfos: self.elemStreams, + ElementaryStreamInfos: elemStreams, } WritePMT(bufPMT, pmt) @@ -85,169 +68,67 @@ func (self *Muxer) WriteHeader() (err error) { return } - // about to remove - for _, stream := range self.Tracks { - stream.spsHasWritten = false - } - return } -func (self *Stream) SetH264PPSAndSPS(pps []byte, sps []byte) { - self.PPS, self.SPS = pps, sps -} - -func (self *Stream) SetTimeScale(timeScale int64) { - self.timeScale = timeScale -} - -func (self *Stream) TimeScale() int64 { - return self.timeScale -} - -func (self *Stream) SetMPEG4AudioConfig(config aacparser.MPEG4AudioConfig) { - self.mpeg4AudioConfig = config -} - -func (self *Stream) tsToPesTs(ts int64) uint64 { - return uint64(ts)*PTS_HZ/uint64(self.timeScale) + PTS_HZ -} - -func (self *Stream) tsToPCR(ts int64) uint64 { - return uint64(ts)*PCR_HZ/uint64(self.timeScale) + PCR_HZ -} - -func (self *Stream) tsToTime(ts int64) float64 { - return float64(ts) / float64(self.timeScale) -} - -func (self *Stream) WriteSample(pts int64, dts int64, isKeyFrame bool, data []byte) (err error) { - if false { - fmt.Println("WriteSample", self.Type, self.tsToTime(dts)) - } - - if self.Type == AAC { +func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) { + stream := self.streams[streamIndex] + if stream.Type() == av.AAC { + data := pkt.Data if !aacparser.IsADTSFrame(data) { - data = append(aacparser.MakeADTSHeader(self.mpeg4AudioConfig, 1024, len(data)), data...) - } - if false { - fmt.Printf("WriteSample=%x\n", data[:5]) + data = append(aacparser.MakeADTSHeader(stream.AACCodecInfo.MPEG4AudioConfig, 1024, len(data)), data...) } buf := &bytes.Buffer{} pes := PESHeader{ - StreamId: self.streamId, - PTS: self.tsToPesTs(pts), + StreamId: StreamIdAAC, + PTS: timeToPesTs(stream.time), } WritePESHeader(buf, pes, len(data)) buf.Write(data) - self.tsw.RandomAccessIndicator = true - self.tsw.PCR = self.tsToPCR(dts) - if err = self.tsw.WriteTo(self.mux.W, buf.Bytes()); err != nil { + stream.tsw.RandomAccessIndicator = true + stream.tsw.PCR = timeToPCR(stream.time) + if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil { return } - } else if self.Type == H264 { + stream.time += pkt.Duration + + } else if stream.Type() == av.H264 { buf := &bytes.Buffer{} pes := PESHeader{ - StreamId: self.streamId, - PTS: self.tsToPesTs(pts), + StreamId: StreamIdH264, + PTS: timeToPesTs(stream.time), } - if dts != pts { - pes.DTS = self.tsToPesTs(dts) + if pkt.CompositionTime > 0.0 { + pes.DTS = timeToPesTs(stream.time + pkt.CompositionTime) } WritePESHeader(buf, pes, 0) - nalus, _ := h264parser.SplitNALUs(data) - if isKeyFrame { - nalus = append([][]byte{self.SPS, self.PPS}, nalus...) + nalus, _ := h264parser.SplitNALUs(pkt.Data) + if pkt.IsKeyFrame { + sps := stream.H264CodecInfo.Record.SPS[0] + pps := stream.H264CodecInfo.Record.PPS[0] + nalus = append([][]byte{sps, pps}, nalus...) } h264parser.WalkNALUsAnnexb(nalus, func(b []byte) { buf.Write(b) }) - self.tsw.RandomAccessIndicator = isKeyFrame - self.tsw.PCR = self.tsToPCR(dts) - if err = self.tsw.WriteTo(self.mux.W, buf.Bytes()); err != nil { + stream.tsw.RandomAccessIndicator = pkt.IsKeyFrame + stream.tsw.PCR = timeToPCR(stream.time) + if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil { return } - } - return -} + stream.time += pkt.Duration -/* about to remove */ - -func (self *Stream) setPCR() { - self.tsw.PCR = uint64(self.PTS) * PCR_HZ / uint64(self.timeScale) -} - -func (self *Stream) getPesHeader(dataLength int) (data []byte) { - if self.PTS == 0 { - self.PTS = self.timeScale - } - buf := &bytes.Buffer{} - pes := PESHeader{ - StreamId: self.streamId, - PTS: uint64(self.PTS) * PTS_HZ / uint64(self.timeScale), - } - WritePESHeader(buf, pes, dataLength) - return buf.Bytes() -} - -func (self *Stream) incPTS(delta int) { - self.PTS += int64(delta) -} - -func (self *Stream) WriteH264NALU(sync bool, duration int, nalu []byte) (err error) { - nalus := [][]byte{} - - if !self.spsHasWritten { - nalus = append(nalus, self.SPS) - nalus = append(nalus, self.PPS) - self.spsHasWritten = true - } - nalus = append(nalus, nalu) - - data := &iovec{} - for i, nalu := range nalus { - var startCode []byte - if i == 0 { - startCode = []byte{0, 0, 0, 1, 0x9, 0xf0, 0, 0, 0, 1} // AUD - } else { - startCode = []byte{0, 0, 1} - } - data.Append(startCode) - data.Append(nalu) - } - - data.Prepend(self.getPesHeader(0)) - self.tsw.RandomAccessIndicator = sync - self.setPCR() - if err = self.tsw.WriteIovecTo(self.mux.W, data); err != nil { + } else { + err = fmt.Errorf("unknown stream type=%d", stream.Type()) return } - self.incPTS(duration) - return -} - -func (self *Stream) WriteADTSAACFrame(duration int, frame []byte) (err error) { - if self.dataBuf != nil && self.dataBuf.Len > self.cacheSize { - self.dataBuf.Prepend(self.getPesHeader(self.dataBuf.Len)) - self.tsw.RandomAccessIndicator = true - self.setPCR() - if err = self.tsw.WriteIovecTo(self.mux.W, self.dataBuf); err != nil { - return - } - self.dataBuf = nil - } - if self.dataBuf == nil { - self.dataBuf = &iovec{} - } - self.dataBuf.Append(frame) - self.incPTS(duration) return } diff --git a/stream.go b/stream.go index bfc6225..e7c144c 100644 --- a/stream.go +++ b/stream.go @@ -2,26 +2,26 @@ package ts import ( "bytes" - "github.com/nareix/codec/aacparser" + "github.com/nareix/av" ) +type tsPacket struct { + av.Packet + time float64 +} + type Stream struct { - SPS []byte - PPS []byte + av.StreamCommon - Type int + time float64 - pid uint - PTS int64 - timeScale int64 + pid uint + buf bytes.Buffer + payload []byte + peshdr *PESHeader + tshdr TSHeader - mpeg4AudioConfig aacparser.MPEG4AudioConfig - buf bytes.Buffer - payload []byte - peshdr *PESHeader - tshdr TSHeader - spsHasWritten bool - payloadReady bool + pkts []tsPacket demuxer *Demuxer mux *Muxer @@ -31,7 +31,10 @@ type Stream struct { cacheSize int } -const ( - H264 = 1 - AAC = 2 -) +func timeToPesTs(time float64) uint64 { + return uint64(time*PTS_HZ) + PTS_HZ +} + +func timeToPCR(time float64) uint64 { + return uint64(time*PCR_HZ) + PCR_HZ +}