rewrite demuxer and muxer

This commit is contained in:
nareix 2016-04-21 20:50:19 +08:00
parent e01abfd2a6
commit dd4452fb65
3 changed files with 128 additions and 245 deletions

View File

@ -3,6 +3,7 @@ package ts
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/nareix/av"
"github.com/nareix/codec/aacparser" "github.com/nareix/codec/aacparser"
"io" "io"
) )
@ -12,32 +13,24 @@ type Demuxer struct {
pat PAT pat PAT
pmt *PMT pmt *PMT
Tracks []*Stream streams []*Stream
TrackH264 *Stream
TrackAAC *Stream
} }
// ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool) // ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool)
// WritePayload(pid, pts, dts, isKeyFrame, payloads, isVideoFrame) // WritePayload(pid, pts, dts, isKeyFrame, payloads, isVideoFrame)
func (self *Demuxer) TimeScale() int64 {
return PTS_HZ
}
func (self *Demuxer) ReadHeader() (err error) { func (self *Demuxer) ReadHeader() (err error) {
self.Tracks = []*Stream{} self.streams = []*Stream{}
self.TrackH264 = nil
self.TrackAAC = nil
for { for {
if self.pmt != nil { if self.pmt != nil {
n := 0 n := 0
for _, stream := range self.Tracks { for _, stream := range self.streams {
if stream.payloadReady { if len(stream.pkts) > 0 {
n++ n++
} }
} }
if n == len(self.Tracks) { if n == len(self.streams) {
break break
} }
} }
@ -50,16 +43,18 @@ func (self *Demuxer) ReadHeader() (err error) {
return return
} }
func (self *Demuxer) ReadSample() (stream *Stream, err error) { func (self *Demuxer) ReadPacket() (streamIndex int, pkt av.Packet, err error) {
if len(self.Tracks) == 0 { if len(self.streams) == 0 {
err = fmt.Errorf("no track") err = fmt.Errorf("no stream")
return return
} }
for { for {
for _, _track := range self.Tracks { for i, stream := range self.streams {
if _track.payloadReady { if len(stream.pkts) > 1 {
stream = _track streamIndex = i
pkt = stream.pkts[0].Packet
stream.pkts = stream.pkts[1:]
return return
} }
} }
@ -99,72 +94,77 @@ func (self *Demuxer) readPacket() (err error) {
stream.pid = info.ElementaryPID stream.pid = info.ElementaryPID
switch info.StreamType { switch info.StreamType {
case ElementaryStreamTypeH264: case ElementaryStreamTypeH264:
stream.Type = H264 stream.SetType(av.H264)
self.TrackH264 = stream self.streams = append(self.streams, stream)
self.Tracks = append(self.Tracks, stream)
case ElementaryStreamTypeAdtsAAC: case ElementaryStreamTypeAdtsAAC:
stream.Type = AAC stream.SetType(av.AAC)
self.TrackAAC = stream self.streams = append(self.streams, stream)
self.Tracks = append(self.Tracks, stream)
} }
} }
} }
} }
} else {
for _, stream := range self.Tracks { } else {
for _, stream := range self.streams {
if header.PID == stream.pid { if header.PID == stream.pid {
if err = stream.appendPacket(header, payload); err != nil { if err = stream.appendPacket(header, payload); err != nil {
return return
} }
} }
} }
}
}
return
}
func (self *Stream) GetMPEG4AudioConfig() aacparser.MPEG4AudioConfig {
return self.mpeg4AudioConfig
}
func (self *Stream) ReadSample() (pts int64, dts int64, isKeyFrame bool, data []byte, err error) {
for !self.payloadReady {
if err = self.demuxer.readPacket(); err != nil {
return
} }
} }
dts = int64(self.peshdr.DTS)
pts = int64(self.peshdr.PTS)
if dts == 0 {
dts = pts
}
isKeyFrame = self.tshdr.RandomAccessIndicator
data = self.payload
self.payloadReady = false
return return
} }
func (self *Stream) appendPayload() (err error) { func (self *Stream) appendPayload() (err error) {
self.payload = self.buf.Bytes() self.payload = self.buf.Bytes()
if self.Type == AAC { if self.Type() == av.AAC {
if !self.mpeg4AudioConfig.IsValid() { if len(self.CodecData()) == 0 {
if self.mpeg4AudioConfig, _, _, _, err = aacparser.ReadADTSFrame(self.payload); err != nil { var config aacparser.MPEG4AudioConfig
if config, _, _, _, err = aacparser.ReadADTSFrame(self.payload); err != nil {
err = fmt.Errorf("ReadADTSFrame failed: %s", err)
return return
} }
self.mpeg4AudioConfig = self.mpeg4AudioConfig.Complete() bw := &bytes.Buffer{}
if !self.mpeg4AudioConfig.IsValid() { if err = aacparser.WriteMPEG4AudioConfig(bw, config); err != nil {
err = fmt.Errorf("invalid MPEG4AudioConfig") err = fmt.Errorf("WriteMPEG4AudioConfig failed: %s", err)
return
}
if err = self.SetCodecData(bw.Bytes()); err != nil {
err = fmt.Errorf("SetCodecData failed: %s", err)
return return
} }
} }
} }
self.payloadReady = true dts := self.peshdr.DTS
pts := self.peshdr.PTS
if dts == 0 {
dts = pts
}
pkt := tsPacket{
Packet: av.Packet{
IsKeyFrame: self.tshdr.RandomAccessIndicator,
Data: self.payload,
},
time: float64(dts)/float64(PTS_HZ),
}
if pts != dts {
pkt.Duration = float64(pts-dts)/float64(PTS_HZ)
}
if len(self.pkts) > 0 {
lastPkt := &self.pkts[len(self.pkts)-1]
lastPkt.Duration = pkt.time - lastPkt.time
}
self.pkts = append(self.pkts, pkt)
return return
} }
@ -179,7 +179,6 @@ func (self *Stream) appendPacket(header TSHeader, payload []byte) (err error) {
} }
if header.PayloadUnitStart { if header.PayloadUnitStart {
self.payloadReady = false
self.buf = bytes.Buffer{} self.buf = bytes.Buffer{}
if self.peshdr, err = ReadPESHeader(lr); err != nil { if self.peshdr, err = ReadPESHeader(lr); err != nil {
return return

211
muxer.go
View File

@ -3,6 +3,7 @@ package ts
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/nareix/av"
"github.com/nareix/codec/aacparser" "github.com/nareix/codec/aacparser"
"github.com/nareix/codec/h264parser" "github.com/nareix/codec/h264parser"
"io" "io"
@ -10,48 +11,19 @@ import (
type Muxer struct { type Muxer struct {
W io.Writer W io.Writer
tswPAT *TSWriter streams []*Stream
tswPMT *TSWriter
elemStreams []ElementaryStreamInfo
TrackH264 *Stream
Tracks []*Stream
} }
func (self *Muxer) newTrack(pid uint, streamId uint) (stream *Stream) { func (self *Muxer) NewStream() av.Stream {
stream = &Stream{ stream := &Stream{
mux: self, mux: self,
tsw: &TSWriter{ tsw: &TSWriter{
PID: pid,
DiscontinuityIndicator: true, DiscontinuityIndicator: true,
PID: uint(len(self.streams) + 0x100),
}, },
streamId: streamId,
} }
stream.tsw.EnableVecWriter() self.streams = append(self.streams, stream)
return return stream
}
func (self *Muxer) AddAACTrack() (stream *Stream) {
self.elemStreams = append(
self.elemStreams,
ElementaryStreamInfo{StreamType: ElementaryStreamTypeAdtsAAC, ElementaryPID: 0x101},
)
stream = self.newTrack(0x101, StreamIdAAC)
stream.Type = AAC
stream.cacheSize = 3000
self.Tracks = append(self.Tracks, stream)
return
}
func (self *Muxer) AddH264Track() (stream *Stream) {
self.elemStreams = append(
self.elemStreams,
ElementaryStreamInfo{StreamType: ElementaryStreamTypeH264, ElementaryPID: 0x100},
)
stream = self.newTrack(0x100, StreamIdH264)
stream.Type = H264
self.TrackH264 = stream
self.Tracks = append(self.Tracks, stream)
return
} }
func (self *Muxer) WriteHeader() (err error) { func (self *Muxer) WriteHeader() (err error) {
@ -64,9 +36,20 @@ func (self *Muxer) WriteHeader() (err error) {
}, },
} }
WritePAT(bufPAT, pat) WritePAT(bufPAT, pat)
var elemStreams []ElementaryStreamInfo
for _, stream := range self.streams {
switch stream.Type() {
case av.AAC:
elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeAdtsAAC, ElementaryPID: stream.tsw.PID})
case av.H264:
elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeH264, ElementaryPID: stream.tsw.PID})
}
}
pmt := PMT{ pmt := PMT{
PCRPID: 0x100, PCRPID: 0x100,
ElementaryStreamInfos: self.elemStreams, ElementaryStreamInfos: elemStreams,
} }
WritePMT(bufPMT, pmt) WritePMT(bufPMT, pmt)
@ -85,169 +68,67 @@ func (self *Muxer) WriteHeader() (err error) {
return return
} }
// about to remove
for _, stream := range self.Tracks {
stream.spsHasWritten = false
}
return return
} }
func (self *Stream) SetH264PPSAndSPS(pps []byte, sps []byte) { func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) {
self.PPS, self.SPS = pps, sps stream := self.streams[streamIndex]
}
func (self *Stream) SetTimeScale(timeScale int64) {
self.timeScale = timeScale
}
func (self *Stream) TimeScale() int64 {
return self.timeScale
}
func (self *Stream) SetMPEG4AudioConfig(config aacparser.MPEG4AudioConfig) {
self.mpeg4AudioConfig = config
}
func (self *Stream) tsToPesTs(ts int64) uint64 {
return uint64(ts)*PTS_HZ/uint64(self.timeScale) + PTS_HZ
}
func (self *Stream) tsToPCR(ts int64) uint64 {
return uint64(ts)*PCR_HZ/uint64(self.timeScale) + PCR_HZ
}
func (self *Stream) tsToTime(ts int64) float64 {
return float64(ts) / float64(self.timeScale)
}
func (self *Stream) WriteSample(pts int64, dts int64, isKeyFrame bool, data []byte) (err error) {
if false {
fmt.Println("WriteSample", self.Type, self.tsToTime(dts))
}
if self.Type == AAC {
if stream.Type() == av.AAC {
data := pkt.Data
if !aacparser.IsADTSFrame(data) { if !aacparser.IsADTSFrame(data) {
data = append(aacparser.MakeADTSHeader(self.mpeg4AudioConfig, 1024, len(data)), data...) data = append(aacparser.MakeADTSHeader(stream.AACCodecInfo.MPEG4AudioConfig, 1024, len(data)), data...)
}
if false {
fmt.Printf("WriteSample=%x\n", data[:5])
} }
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
pes := PESHeader{ pes := PESHeader{
StreamId: self.streamId, StreamId: StreamIdAAC,
PTS: self.tsToPesTs(pts), PTS: timeToPesTs(stream.time),
} }
WritePESHeader(buf, pes, len(data)) WritePESHeader(buf, pes, len(data))
buf.Write(data) buf.Write(data)
self.tsw.RandomAccessIndicator = true stream.tsw.RandomAccessIndicator = true
self.tsw.PCR = self.tsToPCR(dts) stream.tsw.PCR = timeToPCR(stream.time)
if err = self.tsw.WriteTo(self.mux.W, buf.Bytes()); err != nil { if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil {
return return
} }
} else if self.Type == H264 {
stream.time += pkt.Duration
} else if stream.Type() == av.H264 {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
pes := PESHeader{ pes := PESHeader{
StreamId: self.streamId, StreamId: StreamIdH264,
PTS: self.tsToPesTs(pts), PTS: timeToPesTs(stream.time),
} }
if dts != pts { if pkt.CompositionTime > 0.0 {
pes.DTS = self.tsToPesTs(dts) pes.DTS = timeToPesTs(stream.time + pkt.CompositionTime)
} }
WritePESHeader(buf, pes, 0) WritePESHeader(buf, pes, 0)
nalus, _ := h264parser.SplitNALUs(data) nalus, _ := h264parser.SplitNALUs(pkt.Data)
if isKeyFrame { if pkt.IsKeyFrame {
nalus = append([][]byte{self.SPS, self.PPS}, nalus...) sps := stream.H264CodecInfo.Record.SPS[0]
pps := stream.H264CodecInfo.Record.PPS[0]
nalus = append([][]byte{sps, pps}, nalus...)
} }
h264parser.WalkNALUsAnnexb(nalus, func(b []byte) { h264parser.WalkNALUsAnnexb(nalus, func(b []byte) {
buf.Write(b) buf.Write(b)
}) })
self.tsw.RandomAccessIndicator = isKeyFrame stream.tsw.RandomAccessIndicator = pkt.IsKeyFrame
self.tsw.PCR = self.tsToPCR(dts) stream.tsw.PCR = timeToPCR(stream.time)
if err = self.tsw.WriteTo(self.mux.W, buf.Bytes()); err != nil { if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil {
return
}
}
return return
} }
/* about to remove */ stream.time += pkt.Duration
func (self *Stream) setPCR() {
self.tsw.PCR = uint64(self.PTS) * PCR_HZ / uint64(self.timeScale)
}
func (self *Stream) getPesHeader(dataLength int) (data []byte) {
if self.PTS == 0 {
self.PTS = self.timeScale
}
buf := &bytes.Buffer{}
pes := PESHeader{
StreamId: self.streamId,
PTS: uint64(self.PTS) * PTS_HZ / uint64(self.timeScale),
}
WritePESHeader(buf, pes, dataLength)
return buf.Bytes()
}
func (self *Stream) incPTS(delta int) {
self.PTS += int64(delta)
}
func (self *Stream) WriteH264NALU(sync bool, duration int, nalu []byte) (err error) {
nalus := [][]byte{}
if !self.spsHasWritten {
nalus = append(nalus, self.SPS)
nalus = append(nalus, self.PPS)
self.spsHasWritten = true
}
nalus = append(nalus, nalu)
data := &iovec{}
for i, nalu := range nalus {
var startCode []byte
if i == 0 {
startCode = []byte{0, 0, 0, 1, 0x9, 0xf0, 0, 0, 0, 1} // AUD
} else { } else {
startCode = []byte{0, 0, 1} err = fmt.Errorf("unknown stream type=%d", stream.Type())
}
data.Append(startCode)
data.Append(nalu)
}
data.Prepend(self.getPesHeader(0))
self.tsw.RandomAccessIndicator = sync
self.setPCR()
if err = self.tsw.WriteIovecTo(self.mux.W, data); err != nil {
return return
} }
self.incPTS(duration)
return
}
func (self *Stream) WriteADTSAACFrame(duration int, frame []byte) (err error) {
if self.dataBuf != nil && self.dataBuf.Len > self.cacheSize {
self.dataBuf.Prepend(self.getPesHeader(self.dataBuf.Len))
self.tsw.RandomAccessIndicator = true
self.setPCR()
if err = self.tsw.WriteIovecTo(self.mux.W, self.dataBuf); err != nil {
return
}
self.dataBuf = nil
}
if self.dataBuf == nil {
self.dataBuf = &iovec{}
}
self.dataBuf.Append(frame)
self.incPTS(duration)
return return
} }

View File

@ -2,26 +2,26 @@ package ts
import ( import (
"bytes" "bytes"
"github.com/nareix/codec/aacparser" "github.com/nareix/av"
) )
type Stream struct { type tsPacket struct {
SPS []byte av.Packet
PPS []byte time float64
}
Type int type Stream struct {
av.StreamCommon
time float64
pid uint pid uint
PTS int64
timeScale int64
mpeg4AudioConfig aacparser.MPEG4AudioConfig
buf bytes.Buffer buf bytes.Buffer
payload []byte payload []byte
peshdr *PESHeader peshdr *PESHeader
tshdr TSHeader tshdr TSHeader
spsHasWritten bool
payloadReady bool pkts []tsPacket
demuxer *Demuxer demuxer *Demuxer
mux *Muxer mux *Muxer
@ -31,7 +31,10 @@ type Stream struct {
cacheSize int cacheSize int
} }
const ( func timeToPesTs(time float64) uint64 {
H264 = 1 return uint64(time*PTS_HZ) + PTS_HZ
AAC = 2 }
)
func timeToPCR(time float64) uint64 {
return uint64(time*PCR_HZ) + PCR_HZ
}