diff --git a/demuxer.go b/demuxer.go new file mode 100644 index 0000000..b6b18b9 --- /dev/null +++ b/demuxer.go @@ -0,0 +1,221 @@ + +package ts + +import ( + "io" + "bytes" + "fmt" + "github.com/nareix/mp4/isom" +) + +type Demuxer struct { + R io.Reader + + pat PAT + pmt *PMT + Tracks []*Track + TrackH264 *Track + TrackAAC *Track +} + +// ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool) +// WritePayload(pid, pts, dts, isKeyFrame, payloads, isVideoFrame) + +func (self *Demuxer) TimeScale() int64 { + return PTS_HZ +} + +func (self *Demuxer) ReadHeader() (err error) { + self.Tracks = []*Track{} + self.TrackH264 = nil + self.TrackAAC = nil + + for { + if self.pmt != nil { + n := 0 + for _, track := range(self.Tracks) { + if track.payloadReady { + n++ + } + } + if n == len(self.Tracks) { + break + } + } + + if err = self.readPacket(); err != nil { + return + } + } + + return +} + +func (self *Demuxer) ReadSample() (track *Track, err error) { + if len(self.Tracks) == 0 { + err = fmt.Errorf("no track") + return + } + + for { + for _, _track := range(self.Tracks) { + if _track.payloadReady { + track = _track + return + } + } + + if err = self.readPacket(); err != nil { + return + } + } +} + +func (self *Demuxer) readPacket() (err error) { + var header TSHeader + var n int + var data [188]byte + + if header, n, err = ReadTSPacket(self.R, data[:]); err != nil { + return + } + payload := data[:n] + + if header.PID == 0 { + if self.pat, err = ReadPAT(bytes.NewReader(payload)); err != nil { + return + } + } else { + if self.pmt == nil { + for _, entry := range(self.pat.Entries) { + if entry.ProgramMapPID == header.PID { + self.pmt = new(PMT) + if *self.pmt, err = ReadPMT(bytes.NewReader(payload)); err != nil { + return + } + for _, info := range(self.pmt.ElementaryStreamInfos) { + track := &Track{} + + track.demuxer = self + track.pid = info.ElementaryPID + switch info.StreamType { + case ElementaryStreamTypeH264: + track.Type = H264 + self.TrackH264 = track + self.Tracks = append(self.Tracks, track) + case ElementaryStreamTypeAdtsAAC: + track.Type = AAC + self.TrackAAC = track + self.Tracks = append(self.Tracks, track) + } + } + } + } + } else { + + for _, track := range(self.Tracks) { + if header.PID == track.pid { + if err = track.appendPacket(header, payload); err != nil { + return + } + } + } + } + } + + return +} + +func (self *Track) GetMPEG4AudioConfig() isom.MPEG4AudioConfig { + return self.mpeg4AudioConfig +} + +func (self *Track) ReadSample() (pts int64, dts int64, isKeyFrame bool, data []byte, err error) { + for !self.payloadReady { + if err = self.demuxer.readPacket(); err != nil { + return + } + } + + if self.Type == AAC { + var n int + if _, data, n, self.payload, err = isom.ReadADTSPayload(self.payload); err != nil { + return + } + pts = self.PTS + dts = pts + self.PTS += int64(PTS_HZ*n)/int64(self.mpeg4AudioConfig.SampleRate) + if len(self.payload) == 0 { + self.payloadReady = false + } + } else { + dts = int64(self.peshdr.DTS) + pts = int64(self.peshdr.PTS) + isKeyFrame = self.tshdr.RandomAccessIndicator + data = self.payload + self.payloadReady = false + } + + if dts == 0 { + dts = pts + } + return +} + +func (self *Track) appendPayload() (err error) { + self.payload = self.buf.Bytes() + if len(self.payload) == 0 { + err = fmt.Errorf("empty payload") + return + } + + if self.Type == AAC { + if !self.mpeg4AudioConfig.IsValid() { + if self.mpeg4AudioConfig, _, _, _, err = isom.ReadADTSPayload(self.payload); err != nil { + return + } + self.mpeg4AudioConfig = self.mpeg4AudioConfig.Complete() + if !self.mpeg4AudioConfig.IsValid() { + err = fmt.Errorf("invalid MPEG4AudioConfig") + return + } + } + self.PTS = int64(self.peshdr.PTS) + } + + self.payloadReady = true + return +} + +func (self *Track) appendPacket(header TSHeader, payload []byte) (err error) { + r := bytes.NewReader(payload) + lr := &io.LimitedReader{R: r, N: int64(len(payload))} + + if header.PayloadUnitStart && self.peshdr != nil && self.peshdr.DataLength == 0 { + if err = self.appendPayload(); err != nil { + return + } + } + + if header.PayloadUnitStart { + self.payloadReady = false + self.buf = bytes.Buffer{} + if self.peshdr, err = ReadPESHeader(lr); err != nil { + return + } + self.tshdr = header + } + + if _, err = io.CopyN(&self.buf, lr, lr.N); err != nil { + return + } + + if self.buf.Len() == int(self.peshdr.DataLength) { + if err = self.appendPayload(); err != nil { + return + } + } + + return +} + diff --git a/muxer.go b/muxer.go index 8da020f..f8a634b 100644 --- a/muxer.go +++ b/muxer.go @@ -6,23 +6,6 @@ import ( "io" ) -type Track struct { - SPS []byte - PPS []byte - - PTS int64 - TimeScale int64 - - writeSPS bool - spsHasWritten bool - - mux *Muxer - streamId uint - tsw *TSWriter - dataBuf *iovec - cacheSize int -} - func (self *Track) setPCR() { self.tsw.PCR = uint64(self.PTS)*PCR_HZ/uint64(self.TimeScale) } diff --git a/reader.go b/reader.go index 72e5e85..dfd01a6 100644 --- a/reader.go +++ b/reader.go @@ -96,89 +96,90 @@ func ReadTSHeader(r io.Reader) (self TSHeader, err error) { if length, err = ReadUInt(r, 1); err != nil { return } - lr := &io.LimitedReader{R: r, N: int64(length)} - if flags, err = ReadUInt(lr, 1); err != nil { - return - } - - if DebugReader { - fmt.Printf("ts: ext_flags %s\n", FieldsDumper{ - Fields: []struct{ - Length int - Desc string - }{ - {1, "discontinuity_indicator"}, - {1, "random_access_indicator"}, - {1, "elementary_stream_priority_indicator"}, - {1, "pcr_flag"}, - {1, "opcr_flag"}, - {1, "splicing_point_flag"}, - {1, "transport_private_data_flag"}, - {1, "adaptation_field_extension_flag"}, - }, - Val: flags, - Length: 8, - }) - } - - // random_access_indicator - if flags & 0x40 != 0 { - self.RandomAccessIndicator = true - } - - // PCR - if flags & 0x10 != 0 { - var v uint64 - if v, err = ReadUInt64(lr, 6); err != nil { + if length > 0 { + lr := &io.LimitedReader{R: r, N: int64(length)} + if flags, err = ReadUInt(lr, 1); err != nil { return } - // clock is 27MHz - self.PCR = UIntToPCR(v) + if DebugReader { - fmt.Printf("ts: PCR %d %f\n", self.PCR, float64(self.PCR)/PCR_HZ) - } - } - - // OPCR - if flags & 0x08 != 0 { - var v uint64 - if v, err = ReadUInt64(lr, 6); err != nil { - return - } - self.OPCR = UIntToPCR(v) - } - - // Splice countdown - if flags & 0x04 != 0 { - if _, err = ReadUInt(lr, 1); err != nil { - return - } - } - - // Transport private data - if flags & 0x02 != 0 { - var length uint - if length, err = ReadUInt(lr, 1); err != nil { - return + fmt.Printf("ts: ext_flags %s\n", FieldsDumper{ + Fields: []struct{ + Length int + Desc string + }{ + {1, "discontinuity_indicator"}, + {1, "random_access_indicator"}, + {1, "elementary_stream_priority_indicator"}, + {1, "pcr_flag"}, + {1, "opcr_flag"}, + {1, "splicing_point_flag"}, + {1, "transport_private_data_flag"}, + {1, "adaptation_field_extension_flag"}, + }, + Val: flags, + Length: 8, + }) } - b := make([]byte, length) - if _, err = lr.Read(b); err != nil { - return - } - } - - // Adaptation extension - if lr.N > 0 { - if DebugReader { - // rubish - fmt.Println("ts: skip", lr.N) + // random_access_indicator + if flags & 0x40 != 0 { + self.RandomAccessIndicator = true } - if err = ReadDummy(lr, int(lr.N)); err != nil { - return + // PCR + if flags & 0x10 != 0 { + var v uint64 + if v, err = ReadUInt64(lr, 6); err != nil { + return + } + // clock is 27MHz + self.PCR = UIntToPCR(v) + if DebugReader { + fmt.Printf("ts: PCR %d %f\n", self.PCR, float64(self.PCR)/PCR_HZ) + } } + // OPCR + if flags & 0x08 != 0 { + var v uint64 + if v, err = ReadUInt64(lr, 6); err != nil { + return + } + self.OPCR = UIntToPCR(v) + } + + // Splice countdown + if flags & 0x04 != 0 { + if _, err = ReadUInt(lr, 1); err != nil { + return + } + } + + // Transport private data + if flags & 0x02 != 0 { + var length uint + if length, err = ReadUInt(lr, 1); err != nil { + return + } + + b := make([]byte, length) + if _, err = lr.Read(b); err != nil { + return + } + } + + // Adaptation extension + if lr.N > 0 { + if DebugReader { + // rubish + fmt.Println("ts: skip", lr.N) + } + + if err = ReadDummy(lr, int(lr.N)); err != nil { + return + } + } } } diff --git a/track.go b/track.go new file mode 100644 index 0000000..a83aa4c --- /dev/null +++ b/track.go @@ -0,0 +1,39 @@ + +package ts + +import ( + "bytes" + "github.com/nareix/mp4/isom" +) + +type Track struct { + SPS []byte + PPS []byte + + Type int + + pid uint + PTS int64 + TimeScale int64 + + mpeg4AudioConfig isom.MPEG4AudioConfig + buf bytes.Buffer + payload []byte + peshdr *PESHeader + tshdr TSHeader + spsHasWritten bool + payloadReady bool + + demuxer *Demuxer + mux *Muxer + streamId uint + tsw *TSWriter + dataBuf *iovec + cacheSize int +} + +const ( + H264 = 1 + AAC = 2 +) +