ts: optimize muxer

This commit is contained in:
nareix 2016-07-10 15:09:18 +08:00
parent b0699b337b
commit a102eab6c1
4 changed files with 189 additions and 413 deletions

View File

@ -12,7 +12,7 @@ func Handler(h *avutil.RegisterHandler) {
return &Demuxer{R: r}
}
h.WriterMuxer = func(w io.Writer) av.Muxer {
return &Muxer{W: w}
return NewMuxer(w)
}
}

View File

@ -2,8 +2,10 @@ package ts
import (
"bytes"
"bufio"
"fmt"
"github.com/nareix/joy4/av"
"github.com/nareix/pio"
"github.com/nareix/joy4/codec/aacparser"
"github.com/nareix/joy4/codec/h264parser"
"io"
@ -14,37 +16,43 @@ type Muxer struct {
streams []*Stream
PaddingToMakeCounterCont bool
peshdr []byte
tshdr []byte
adtshdr []byte
datav [][]byte
nalus [][]byte
tswPAT *TSWriter
tswPMT *TSWriter
}
func (self *Muxer) isCodecSupported(codec av.CodecData) bool {
switch codec.Type() {
case av.H264, av.AAC:
return true
default:
return false
}
}
var supportedCodecTypes = []av.CodecType{av.H264, av.AAC}
func (self *Muxer) newStream(codec av.CodecData) (err error) {
if !self.isCodecSupported(codec) {
ok := false
for _, c := range supportedCodecTypes {
if codec.Type() == c {
ok = true
break
}
}
if !ok {
err = fmt.Errorf("codec type=%x is not supported", codec.Type())
return
}
pid := uint(len(self.streams) + 0x100)
stream := &Stream{
muxer: self,
CodecData: codec,
tsw: &TSWriter{
DiscontinuityIndicator: true,
PID: uint(len(self.streams) + 0x100),
},
pid: pid,
tsw: NewTSWriter(uint16(pid)),
}
self.streams = append(self.streams, stream)
return
}
/*
func (self *Muxer) writePaddingTSPackets(tsw *TSWriter) (err error) {
for tsw.ContinuityCounter&0xf != 0x0 {
header := TSHeader{
@ -58,8 +66,10 @@ func (self *Muxer) writePaddingTSPackets(tsw *TSWriter) (err error) {
}
return
}
*/
func (self *Muxer) WriteTrailer() (err error) {
/*
if self.PaddingToMakeCounterCont {
for _, stream := range self.streams {
if err = self.writePaddingTSPackets(stream.tsw); err != nil {
@ -67,9 +77,21 @@ func (self *Muxer) WriteTrailer() (err error) {
}
}
}
*/
return
}
func NewMuxer(w io.Writer) *Muxer {
return &Muxer{
W: bufio.NewWriterSize(w, pio.RecommendBufioSize),
peshdr: make([]byte, MaxPESHeaderLength),
tshdr: make([]byte, MaxTSHeaderLength),
adtshdr: make([]byte, aacparser.ADTSHeaderLength),
nalus: make([][]byte, 16),
datav: make([][]byte, 16),
}
}
func (self *Muxer) WritePATPMT() (err error) {
bufPAT := &bytes.Buffer{}
bufPMT := &bytes.Buffer{}
@ -85,9 +107,9 @@ func (self *Muxer) WritePATPMT() (err error) {
for _, stream := range self.streams {
switch stream.Type() {
case av.AAC:
elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeAdtsAAC, ElementaryPID: stream.tsw.PID})
elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeAdtsAAC, ElementaryPID: stream.pid})
case av.H264:
elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeH264, ElementaryPID: stream.tsw.PID})
elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeH264, ElementaryPID: stream.pid})
}
}
@ -97,18 +119,12 @@ func (self *Muxer) WritePATPMT() (err error) {
}
WritePMT(bufPMT, pmt)
self.tswPMT = &TSWriter{
PID: 0x1000,
DiscontinuityIndicator: true,
}
self.tswPAT = &TSWriter{
PID: 0,
DiscontinuityIndicator: true,
}
if err = self.tswPAT.WriteTo(self.W, bufPAT.Bytes()); err != nil {
self.tswPMT = NewTSWriter(0x1000)
self.tswPAT = NewTSWriter(0)
if err = self.tswPAT.WritePackets(self.W, [][]byte{bufPAT.Bytes()}, 0, false, true); err != nil {
return
}
if err = self.tswPMT.WriteTo(self.W, bufPMT.Bytes()); err != nil {
if err = self.tswPMT.WritePackets(self.W, [][]byte{bufPMT.Bytes()}, 0, false, true); err != nil {
return
}
@ -129,62 +145,171 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) {
return
}
func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
if false {
fmt.Println("ts:", "in", pkt.Idx, pkt.Time, "len", len(pkt.Data))
const MaxPESHeaderLength = 19
const MaxTSHeaderLength = 12
func FillPESHeader(h []byte, streamid uint8, datalength int, pts, dts uint64) (n int) {
h[0] = 0
h[1] = 0
h[2] = 1
h[3] = streamid
const PTS = 1 << 7
const DTS = 1 << 6
var pts_dts_flags uint8
if pts != 0 {
pts_dts_flags |= PTS
if dts != 0 {
pts_dts_flags |= DTS
}
if err = self.writePacket(pkt); err != nil {
return
}
if pts_dts_flags&PTS != 0 {
n += 5
}
if pts_dts_flags&DTS != 0 {
n += 5
}
var packet_length uint16
// packet_length(16) if zero then variable length
// Specifies the number of bytes remaining in the packet after this field. Can be zero.
// If the PES packet length is set to zero, the PES packet can be of any length.
// A value of zero for the PES packet length can be used only when the PES packet payload is a **video** elementary stream.
if datalength >= 0 {
packet_length = uint16(datalength + n + 3)
}
pio.PutU16BE(h[4:6], packet_length)
h[6] = 2<<6|1 // resverd(6,2)=2,original_or_copy(0,1)=1
h[7] = pts_dts_flags
h[8] = uint8(n)
// pts(40)?
// dts(40)?
if pts_dts_flags&PTS != 0 {
if pts_dts_flags&DTS != 0 {
pio.PutU40BE(h[9:14], PESTsToUInt(pts)|3<<36)
pio.PutU40BE(h[14:19], PESTsToUInt(dts)|1<<36)
} else {
pio.PutU40BE(h[9:14], PESTsToUInt(pts)|2<<36)
}
}
n += 9
return
}
func (self *Muxer) writePacket(pkt av.Packet) (err error) {
func NewTSWriter(pid uint16) *TSWriter {
w := &TSWriter{}
w.tshdr = make([]byte, 188)
w.tshdr[0] = 0x47
pio.PutU16BE(w.tshdr[1:3], pid&0x1fff)
for i := 6; i < 188; i++ {
w.tshdr[i] = 0xff
}
return w
}
func (self *TSWriter) WritePackets(w io.Writer, datav [][]byte, pcr uint64, sync bool, paddata bool) (err error) {
datavlen := pio.VecLen(datav)
writev := make([][]byte, len(datav))
writepos := 0
for writepos < datavlen {
self.tshdr[1] = self.tshdr[1]&0x1f
self.tshdr[3] = byte(self.ContinuityCounter)&0xf|0x30
self.tshdr[5] = 0 // flags
hdrlen := 6
self.ContinuityCounter++
if writepos == 0 {
self.tshdr[1] = 0x40|self.tshdr[1] // Payload Unit Start Indicator
if pcr != 0 {
hdrlen += 6
self.tshdr[5] = 0x10|self.tshdr[5] // PCR flag (Discontinuity indicator 0x80)
pio.PutU48BE(self.tshdr[6:12], PCRToUInt(pcr))
}
if sync {
self.tshdr[5] = 0x40|self.tshdr[5] // Random Access indicator
}
}
padtail := 0
end := writepos + 188 - hdrlen
if end > datavlen {
if paddata {
padtail = end - datavlen
} else {
hdrlen += end - datavlen
}
end = datavlen
}
n := pio.VecSliceNoNew(datav, writev, writepos, end)
self.tshdr[4] = byte(hdrlen)-5 // length
if _, err = w.Write(self.tshdr[:hdrlen]); err != nil {
return
}
for i := 0; i < n; i++ {
if _, err = w.Write(writev[i]); err != nil {
return
}
}
if padtail > 0 {
if _, err = w.Write(self.tshdr[188-padtail:188]); err != nil {
return
}
}
writepos = end
}
return
}
func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
stream := self.streams[pkt.Idx]
switch stream.Type() {
case av.AAC:
codec := stream.CodecData.(aacparser.CodecData)
adtshdr := make([]byte, aacparser.ADTSHeaderLength)
aacparser.FillADTSHeader(adtshdr, codec.Config, 1024, len(pkt.Data))
buf := &bytes.Buffer{}
pes := PESHeader{
StreamId: StreamIdAAC,
PTS: timeToPesTs(pkt.Time),
}
WritePESHeader(buf, pes, len(pkt.Data)+len(adtshdr))
buf.Write(adtshdr)
buf.Write(pkt.Data)
n := FillPESHeader(self.peshdr, StreamIdAAC, len(self.adtshdr)+len(pkt.Data), timeToPesTs(pkt.Time), 0)
self.datav[0] = self.peshdr[:n]
aacparser.FillADTSHeader(self.adtshdr, codec.Config, 1024, len(pkt.Data))
self.datav[1] = self.adtshdr
self.datav[2] = pkt.Data
stream.tsw.RandomAccessIndicator = true
stream.tsw.PCR = timeToPCR(pkt.Time)
if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil {
if err = stream.tsw.WritePackets(self.W, self.datav[:3], timeToPCR(pkt.Time), true, false); err != nil {
return
}
case av.H264:
codec := stream.CodecData.(h264parser.CodecData)
buf := &bytes.Buffer{}
pes := PESHeader{
StreamId: StreamIdH264,
PTS: timeToPesTs(pkt.Time + pkt.CompositionTime),
DTS: timeToPesTs(pkt.Time),
}
WritePESHeader(buf, pes, 0)
nalus := [][]byte{}
nalus := self.nalus[:0]
if pkt.IsKeyFrame {
nalus = append([][]byte{codec.SPS(), codec.PPS()})
nalus = append(nalus, codec.SPS())
nalus = append(nalus, codec.PPS())
}
nalus = append(nalus, pkt.Data[4:])
pktnalus, _ := h264parser.SplitNALUs(pkt.Data)
for _, nalu := range pktnalus {
nalus = append(nalus, nalu)
}
datav := self.datav[:1]
h264parser.WalkNALUsAnnexb(nalus, func(b []byte) {
buf.Write(b)
datav = append(datav, b)
})
stream.tsw.RandomAccessIndicator = pkt.IsKeyFrame
stream.tsw.PCR = timeToPCR(pkt.Time)
if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil {
pts := timeToPesTs(pkt.Time+pkt.CompositionTime)
dts := timeToPesTs(pkt.Time)
n := FillPESHeader(self.peshdr, StreamIdH264, -1, pts, dts)
datav[0] = self.peshdr[:n]
if err = stream.tsw.WritePackets(self.W, datav, timeToPCR(pkt.Time), pkt.IsKeyFrame, false); err != nil {
return
}
}

View File

@ -9,7 +9,6 @@ import (
type Stream struct {
av.CodecData
pid uint
buf bytes.Buffer
peshdr *PESHeader
tshdr TSHeader
@ -17,13 +16,11 @@ type Stream struct {
demuxer *Demuxer
muxer *Muxer
pid uint
streamId uint
streamType uint
tsw *TSWriter
dataBuf *iovec
cacheSize int
idx int
}

View File

@ -22,221 +22,10 @@ func WriteUInt(w io.Writer, val uint, n int) (err error) {
return WriteUInt64(w, uint64(val), n)
}
func makeRepeatValBytes(val byte, n int) []byte {
b := make([]byte, n)
for i := range b {
b[i] = val
}
return b
}
func WriteRepeatVal(w io.Writer, val byte, n int) (err error) {
_, err = w.Write(makeRepeatValBytes(val, n))
return
}
func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (written int, err error) {
var flags, extFlags uint
// sync(8)
// transport_error_indicator(1)
// payload_unit_start_indicator(1)
// transport_priority(1)
// pid(13)
// Scrambling control(2)
// Adaptation field flag(1) 0x20
// Payload flag(1) 0x10
// Continuity counter(4)
flags = 0x47 << 24
flags |= 0x10
if self.PayloadUnitStart {
flags |= 0x400000
}
flags |= (self.PID & 0x1fff) << 8
flags |= self.ContinuityCounter & 0xf
if DebugWriter {
fmt.Fprintf(DebugOutput, "tsw: pid=%x\n", self.PID)
}
const PCR = 0x10
const OPCR = 0x08
const EXT = 0x20
if self.PCR != 0 {
extFlags |= PCR
}
if self.OPCR != 0 {
extFlags |= OPCR
}
if self.RandomAccessIndicator {
extFlags |= 0x40
}
if self.DiscontinuityIndicator {
extFlags |= 0x80
}
if extFlags != 0 {
flags |= EXT
}
// need padding
if dataLength < 184 {
flags |= EXT
}
if err = WriteUInt(w, flags, 4); err != nil {
return
}
written += 4
if flags&EXT != 0 {
var length uint
// Discontinuity indicator 1 0x80
// Random Access indicator 1 0x40
// Elementary stream priority indicator 1 0x20
// PCR flag 1 0x10
// OPCR flag 1 0x08
length = 1 // extFlags
if extFlags&PCR != 0 {
length += 6
}
if extFlags&OPCR != 0 {
length += 6
}
paddingLength := 0
// need padding
if int(length)+5+dataLength < 188 {
paddingLength = 188 - dataLength - 5 - int(length)
length = 188 - uint(dataLength) - 5
}
if DebugWriter {
fmt.Fprintf(DebugOutput, "tsw: header padding=%d\n", paddingLength)
}
if err = WriteUInt(w, length, 1); err != nil {
return
}
if err = WriteUInt(w, extFlags, 1); err != nil {
return
}
if extFlags&PCR != 0 {
if err = WriteUInt64(w, PCRToUInt(self.PCR), 6); err != nil {
return
}
}
if extFlags&OPCR != 0 {
if err = WriteUInt64(w, PCRToUInt(self.OPCR), 6); err != nil {
return
}
}
if paddingLength > 0 {
if err = WriteRepeatVal(w, 0xff, paddingLength); err != nil {
return
}
}
written += int(length) + 1
}
return
}
type TSWriter struct {
W io.Writer
PID uint
TSHeader
DisableHeaderPadding bool
DiscontinuityIndicator bool
vecw *vecWriter
}
func (self *TSWriter) EnableVecWriter() {
if self.vecw == nil {
self.vecw = newVecWriter(self.W)
if DebugWriter && self.vecw != nil {
fmt.Fprintln(DebugOutput, "tsw: enabled vec writer")
}
}
}
func (self *TSWriter) WriteIovec(data *iovec) (err error) {
if self.vecw != nil {
if err = self.WriteIovecTo(self.vecw, data); err != nil {
return
}
if err = self.vecw.Flush(); err != nil {
return
}
} else {
if err = self.WriteIovecTo(self.W, data); err != nil {
return
}
}
return
}
func (self *TSWriter) WriteIovecTo(w io.Writer, data *iovec) (err error) {
for i := 0; data.Len > 0; i++ {
header := TSHeader{
PID: self.PID,
ContinuityCounter: self.ContinuityCounter,
DiscontinuityIndicator: self.DiscontinuityIndicator,
}
if i == 0 {
header.PayloadUnitStart = true
header.RandomAccessIndicator = self.RandomAccessIndicator
header.PCR = self.PCR
header.OPCR = self.OPCR
}
requestLength := data.Len
if self.DisableHeaderPadding {
requestLength = 188
}
var headerLength int
if headerLength, err = WriteTSHeader(w, header, requestLength); err != nil {
return
}
payloadLength := 188 - headerLength
if self.DisableHeaderPadding && data.Len < payloadLength {
data.Append(makeRepeatValBytes(0xff, payloadLength-data.Len))
}
if DebugWriter {
fmt.Fprintf(DebugOutput, "tsw: payloadLength=%d dataLength=%d\n", payloadLength, data.Len)
}
if _, err = data.WriteTo(w, payloadLength); err != nil {
return
}
self.ContinuityCounter++
}
return
}
func (self *TSWriter) WriteTo(w io.Writer, data []byte) (err error) {
iov := &iovec{}
iov.Append(data)
return self.WriteIovecTo(w, iov)
}
func (self *TSWriter) Write(data []byte) (err error) {
iov := &iovec{}
iov.Append(data)
return self.WriteIovec(iov)
w io.Writer
ContinuityCounter uint
tshdr []byte
}
func WritePSI(w io.Writer, self PSI, data []byte) (err error) {
@ -312,108 +101,6 @@ func bswap32(v uint) uint {
return (v >> 24) | ((v>>16)&0xff)<<8 | ((v>>8)&0xff)<<16 | (v&0xff)<<24
}
func WritePESHeader(w io.Writer, self PESHeader, dataLength int) (err error) {
// http://dvd.sourceforge.net/dvdinfo/pes-hdr.html
var pts_dts_flags, header_length, packet_length uint
// start code(24) 000001
// StreamId(8)
// packet_length(16)
// resverd(6,2)=2,original_or_copy(0,1)=1
// pts_dts_flags(6,2)
// header_length(8)
// pts(40)?
// dts(40)?
// data
// start code(24) 000001
if err = WriteUInt(w, 0x000001, 3); err != nil {
return
}
// StreamId(8)
if err = WriteUInt(w, self.StreamId, 1); err != nil {
return
}
const PTS = 1 << 7
const DTS = 1 << 6
if self.PTS != 0 {
pts_dts_flags |= PTS
if self.DTS != 0 {
pts_dts_flags |= DTS
}
}
if pts_dts_flags&PTS != 0 {
header_length += 5
}
if pts_dts_flags&DTS != 0 {
header_length += 5
}
if dataLength > 0 {
packet_length = uint(dataLength) + header_length + 3
}
// packet_length(16) if zero then variable length
// Specifies the number of bytes remaining in the packet after this field. Can be zero.
// If the PES packet length is set to zero, the PES packet can be of any length.
// A value of zero for the PES packet length can be used only when the PES packet payload is a video elementary stream.
if err = WriteUInt(w, packet_length, 2); err != nil {
return
}
// resverd(6,2)=2,original_or_copy(0,1)=1
if err = WriteUInt(w, 2<<6|1, 1); err != nil {
return
}
// pts_dts_flags(6,2)
if err = WriteUInt(w, pts_dts_flags, 1); err != nil {
return
}
// header_length(8)
if err = WriteUInt(w, header_length, 1); err != nil {
return
}
// pts(40)?
// dts(40)?
if pts_dts_flags&PTS != 0 {
if pts_dts_flags&DTS != 0 {
if err = WriteUInt64(w, PESTsToUInt(self.PTS)|3<<36, 5); err != nil {
return
}
if err = WriteUInt64(w, PESTsToUInt(self.DTS)|1<<36, 5); err != nil {
return
}
} else {
if err = WriteUInt64(w, PESTsToUInt(self.PTS)|2<<36, 5); err != nil {
return
}
}
}
return
}
func WritePESPacket(w *TSWriter, header PESHeader, data []byte) (err error) {
bw := &bytes.Buffer{}
if err = WritePESHeader(bw, header, len(data)); err != nil {
return
}
iov := &iovec{}
iov.Append(bw.Bytes())
iov.Append(data)
if err = w.WriteIovec(iov); err != nil {
return
}
return
}
func WritePAT(w io.Writer, self PAT) (err error) {
bw := &bytes.Buffer{}
@ -442,22 +129,6 @@ func WritePAT(w io.Writer, self PAT) (err error) {
return
}
func WritePATPacket(w io.Writer, pat PAT) (err error) {
tsw := &TSWriter{
W: w,
PID: 0,
DisableHeaderPadding: true,
}
bw := &bytes.Buffer{}
if err = WritePAT(bw, pat); err != nil {
return
}
if err = tsw.Write(bw.Bytes()); err != nil {
return
}
return
}
func WritePMT(w io.Writer, self PMT) (err error) {
writeDescs := func(w io.Writer, descs []Descriptor) (err error) {
for _, desc := range descs {
@ -537,20 +208,3 @@ func WritePMT(w io.Writer, self PMT) (err error) {
return
}
func WritePMTPacket(w io.Writer, pmt PMT, pid uint) (err error) {
tsw := &TSWriter{
W: w,
PID: pid,
DisableHeaderPadding: true,
}
bw := &bytes.Buffer{}
if err = WritePMT(bw, pmt); err != nil {
return
}
if err = tsw.Write(bw.Bytes()); err != nil {
return
}
return
}