From a102eab6c1ef37d7ab150b554e8e227b27574ed7 Mon Sep 17 00:00:00 2001 From: nareix Date: Sun, 10 Jul 2016 15:09:18 +0800 Subject: [PATCH] ts: optimize muxer --- format/ts/handler.go | 2 +- format/ts/muxer.go | 241 ++++++++++++++++++++++------- format/ts/stream.go | 7 +- format/ts/writer.go | 352 +------------------------------------------ 4 files changed, 189 insertions(+), 413 deletions(-) diff --git a/format/ts/handler.go b/format/ts/handler.go index bdac48e..69e99a9 100644 --- a/format/ts/handler.go +++ b/format/ts/handler.go @@ -12,7 +12,7 @@ func Handler(h *avutil.RegisterHandler) { return &Demuxer{R: r} } h.WriterMuxer = func(w io.Writer) av.Muxer { - return &Muxer{W: w} + return NewMuxer(w) } } diff --git a/format/ts/muxer.go b/format/ts/muxer.go index 2bef01a..016ea75 100644 --- a/format/ts/muxer.go +++ b/format/ts/muxer.go @@ -2,8 +2,10 @@ package ts import ( "bytes" + "bufio" "fmt" "github.com/nareix/joy4/av" + "github.com/nareix/pio" "github.com/nareix/joy4/codec/aacparser" "github.com/nareix/joy4/codec/h264parser" "io" @@ -14,37 +16,43 @@ type Muxer struct { streams []*Stream PaddingToMakeCounterCont bool + peshdr []byte + tshdr []byte + adtshdr []byte + datav [][]byte + nalus [][]byte + tswPAT *TSWriter tswPMT *TSWriter } -func (self *Muxer) isCodecSupported(codec av.CodecData) bool { - switch codec.Type() { - case av.H264, av.AAC: - return true - default: - return false - } -} +var supportedCodecTypes = []av.CodecType{av.H264, av.AAC} func (self *Muxer) newStream(codec av.CodecData) (err error) { - if !self.isCodecSupported(codec) { + ok := false + for _, c := range supportedCodecTypes { + if codec.Type() == c { + ok = true + break + } + } + if !ok { err = fmt.Errorf("codec type=%x is not supported", codec.Type()) return } + pid := uint(len(self.streams) + 0x100) stream := &Stream{ muxer: self, CodecData: codec, - tsw: &TSWriter{ - DiscontinuityIndicator: true, - PID: uint(len(self.streams) + 0x100), - }, + pid: pid, + tsw: NewTSWriter(uint16(pid)), } self.streams = append(self.streams, stream) return } +/* func (self *Muxer) writePaddingTSPackets(tsw *TSWriter) (err error) { for tsw.ContinuityCounter&0xf != 0x0 { header := TSHeader{ @@ -58,8 +66,10 @@ func (self *Muxer) writePaddingTSPackets(tsw *TSWriter) (err error) { } return } +*/ func (self *Muxer) WriteTrailer() (err error) { + /* if self.PaddingToMakeCounterCont { for _, stream := range self.streams { if err = self.writePaddingTSPackets(stream.tsw); err != nil { @@ -67,9 +77,21 @@ func (self *Muxer) WriteTrailer() (err error) { } } } + */ return } +func NewMuxer(w io.Writer) *Muxer { + return &Muxer{ + W: bufio.NewWriterSize(w, pio.RecommendBufioSize), + peshdr: make([]byte, MaxPESHeaderLength), + tshdr: make([]byte, MaxTSHeaderLength), + adtshdr: make([]byte, aacparser.ADTSHeaderLength), + nalus: make([][]byte, 16), + datav: make([][]byte, 16), + } +} + func (self *Muxer) WritePATPMT() (err error) { bufPAT := &bytes.Buffer{} bufPMT := &bytes.Buffer{} @@ -85,9 +107,9 @@ func (self *Muxer) WritePATPMT() (err error) { for _, stream := range self.streams { switch stream.Type() { case av.AAC: - elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeAdtsAAC, ElementaryPID: stream.tsw.PID}) + elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeAdtsAAC, ElementaryPID: stream.pid}) case av.H264: - elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeH264, ElementaryPID: stream.tsw.PID}) + elemStreams = append(elemStreams, ElementaryStreamInfo{StreamType: ElementaryStreamTypeH264, ElementaryPID: stream.pid}) } } @@ -97,18 +119,12 @@ func (self *Muxer) WritePATPMT() (err error) { } WritePMT(bufPMT, pmt) - self.tswPMT = &TSWriter{ - PID: 0x1000, - DiscontinuityIndicator: true, - } - self.tswPAT = &TSWriter{ - PID: 0, - DiscontinuityIndicator: true, - } - if err = self.tswPAT.WriteTo(self.W, bufPAT.Bytes()); err != nil { + self.tswPMT = NewTSWriter(0x1000) + self.tswPAT = NewTSWriter(0) + if err = self.tswPAT.WritePackets(self.W, [][]byte{bufPAT.Bytes()}, 0, false, true); err != nil { return } - if err = self.tswPMT.WriteTo(self.W, bufPMT.Bytes()); err != nil { + if err = self.tswPMT.WritePackets(self.W, [][]byte{bufPMT.Bytes()}, 0, false, true); err != nil { return } @@ -129,62 +145,171 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) { return } -func (self *Muxer) WritePacket(pkt av.Packet) (err error) { - if false { - fmt.Println("ts:", "in", pkt.Idx, pkt.Time, "len", len(pkt.Data)) +const MaxPESHeaderLength = 19 +const MaxTSHeaderLength = 12 + +func FillPESHeader(h []byte, streamid uint8, datalength int, pts, dts uint64) (n int) { + h[0] = 0 + h[1] = 0 + h[2] = 1 + h[3] = streamid + + const PTS = 1 << 7 + const DTS = 1 << 6 + + var pts_dts_flags uint8 + if pts != 0 { + pts_dts_flags |= PTS + if dts != 0 { + pts_dts_flags |= DTS + } } - if err = self.writePacket(pkt); err != nil { - return + + if pts_dts_flags&PTS != 0 { + n += 5 } + if pts_dts_flags&DTS != 0 { + n += 5 + } + + var packet_length uint16 + // packet_length(16) if zero then variable length + // Specifies the number of bytes remaining in the packet after this field. Can be zero. + // If the PES packet length is set to zero, the PES packet can be of any length. + // A value of zero for the PES packet length can be used only when the PES packet payload is a **video** elementary stream. + if datalength >= 0 { + packet_length = uint16(datalength + n + 3) + } + pio.PutU16BE(h[4:6], packet_length) + + h[6] = 2<<6|1 // resverd(6,2)=2,original_or_copy(0,1)=1 + h[7] = pts_dts_flags + h[8] = uint8(n) + + // pts(40)? + // dts(40)? + if pts_dts_flags&PTS != 0 { + if pts_dts_flags&DTS != 0 { + pio.PutU40BE(h[9:14], PESTsToUInt(pts)|3<<36) + pio.PutU40BE(h[14:19], PESTsToUInt(dts)|1<<36) + } else { + pio.PutU40BE(h[9:14], PESTsToUInt(pts)|2<<36) + } + } + + n += 9 return } -func (self *Muxer) writePacket(pkt av.Packet) (err error) { +func NewTSWriter(pid uint16) *TSWriter { + w := &TSWriter{} + w.tshdr = make([]byte, 188) + w.tshdr[0] = 0x47 + pio.PutU16BE(w.tshdr[1:3], pid&0x1fff) + for i := 6; i < 188; i++ { + w.tshdr[i] = 0xff + } + return w +} + +func (self *TSWriter) WritePackets(w io.Writer, datav [][]byte, pcr uint64, sync bool, paddata bool) (err error) { + datavlen := pio.VecLen(datav) + writev := make([][]byte, len(datav)) + writepos := 0 + + for writepos < datavlen { + self.tshdr[1] = self.tshdr[1]&0x1f + self.tshdr[3] = byte(self.ContinuityCounter)&0xf|0x30 + self.tshdr[5] = 0 // flags + hdrlen := 6 + self.ContinuityCounter++ + + if writepos == 0 { + self.tshdr[1] = 0x40|self.tshdr[1] // Payload Unit Start Indicator + if pcr != 0 { + hdrlen += 6 + self.tshdr[5] = 0x10|self.tshdr[5] // PCR flag (Discontinuity indicator 0x80) + pio.PutU48BE(self.tshdr[6:12], PCRToUInt(pcr)) + } + if sync { + self.tshdr[5] = 0x40|self.tshdr[5] // Random Access indicator + } + } + + padtail := 0 + end := writepos + 188 - hdrlen + if end > datavlen { + if paddata { + padtail = end - datavlen + } else { + hdrlen += end - datavlen + } + end = datavlen + } + n := pio.VecSliceNoNew(datav, writev, writepos, end) + + self.tshdr[4] = byte(hdrlen)-5 // length + if _, err = w.Write(self.tshdr[:hdrlen]); err != nil { + return + } + for i := 0; i < n; i++ { + if _, err = w.Write(writev[i]); err != nil { + return + } + } + if padtail > 0 { + if _, err = w.Write(self.tshdr[188-padtail:188]); err != nil { + return + } + } + + writepos = end + } + + return +} + +func (self *Muxer) WritePacket(pkt av.Packet) (err error) { stream := self.streams[pkt.Idx] switch stream.Type() { case av.AAC: codec := stream.CodecData.(aacparser.CodecData) - adtshdr := make([]byte, aacparser.ADTSHeaderLength) - aacparser.FillADTSHeader(adtshdr, codec.Config, 1024, len(pkt.Data)) - buf := &bytes.Buffer{} - pes := PESHeader{ - StreamId: StreamIdAAC, - PTS: timeToPesTs(pkt.Time), - } - WritePESHeader(buf, pes, len(pkt.Data)+len(adtshdr)) - buf.Write(adtshdr) - buf.Write(pkt.Data) + n := FillPESHeader(self.peshdr, StreamIdAAC, len(self.adtshdr)+len(pkt.Data), timeToPesTs(pkt.Time), 0) + self.datav[0] = self.peshdr[:n] + aacparser.FillADTSHeader(self.adtshdr, codec.Config, 1024, len(pkt.Data)) + self.datav[1] = self.adtshdr + self.datav[2] = pkt.Data - stream.tsw.RandomAccessIndicator = true - stream.tsw.PCR = timeToPCR(pkt.Time) - if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil { + if err = stream.tsw.WritePackets(self.W, self.datav[:3], timeToPCR(pkt.Time), true, false); err != nil { return } case av.H264: codec := stream.CodecData.(h264parser.CodecData) - buf := &bytes.Buffer{} - pes := PESHeader{ - StreamId: StreamIdH264, - PTS: timeToPesTs(pkt.Time + pkt.CompositionTime), - DTS: timeToPesTs(pkt.Time), - } - WritePESHeader(buf, pes, 0) - nalus := [][]byte{} + nalus := self.nalus[:0] if pkt.IsKeyFrame { - nalus = append([][]byte{codec.SPS(), codec.PPS()}) + nalus = append(nalus, codec.SPS()) + nalus = append(nalus, codec.PPS()) } - nalus = append(nalus, pkt.Data[4:]) + pktnalus, _ := h264parser.SplitNALUs(pkt.Data) + for _, nalu := range pktnalus { + nalus = append(nalus, nalu) + } + + datav := self.datav[:1] h264parser.WalkNALUsAnnexb(nalus, func(b []byte) { - buf.Write(b) + datav = append(datav, b) }) - stream.tsw.RandomAccessIndicator = pkt.IsKeyFrame - stream.tsw.PCR = timeToPCR(pkt.Time) - if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil { + pts := timeToPesTs(pkt.Time+pkt.CompositionTime) + dts := timeToPesTs(pkt.Time) + n := FillPESHeader(self.peshdr, StreamIdH264, -1, pts, dts) + datav[0] = self.peshdr[:n] + + if err = stream.tsw.WritePackets(self.W, datav, timeToPCR(pkt.Time), pkt.IsKeyFrame, false); err != nil { return } } diff --git a/format/ts/stream.go b/format/ts/stream.go index 62f877f..bdc13fe 100644 --- a/format/ts/stream.go +++ b/format/ts/stream.go @@ -9,21 +9,18 @@ import ( type Stream struct { av.CodecData - pid uint - buf bytes.Buffer + buf bytes.Buffer peshdr *PESHeader tshdr TSHeader demuxer *Demuxer muxer *Muxer + pid uint streamId uint streamType uint tsw *TSWriter - dataBuf *iovec - cacheSize int - idx int } diff --git a/format/ts/writer.go b/format/ts/writer.go index 85938ff..479fd74 100644 --- a/format/ts/writer.go +++ b/format/ts/writer.go @@ -22,221 +22,10 @@ func WriteUInt(w io.Writer, val uint, n int) (err error) { return WriteUInt64(w, uint64(val), n) } -func makeRepeatValBytes(val byte, n int) []byte { - b := make([]byte, n) - for i := range b { - b[i] = val - } - return b -} - -func WriteRepeatVal(w io.Writer, val byte, n int) (err error) { - _, err = w.Write(makeRepeatValBytes(val, n)) - return -} - -func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (written int, err error) { - var flags, extFlags uint - - // sync(8) - // transport_error_indicator(1) - // payload_unit_start_indicator(1) - // transport_priority(1) - // pid(13) - // Scrambling control(2) - // Adaptation field flag(1) 0x20 - // Payload flag(1) 0x10 - // Continuity counter(4) - - flags = 0x47 << 24 - flags |= 0x10 - if self.PayloadUnitStart { - flags |= 0x400000 - } - flags |= (self.PID & 0x1fff) << 8 - flags |= self.ContinuityCounter & 0xf - - if DebugWriter { - fmt.Fprintf(DebugOutput, "tsw: pid=%x\n", self.PID) - } - - const PCR = 0x10 - const OPCR = 0x08 - const EXT = 0x20 - - if self.PCR != 0 { - extFlags |= PCR - } - if self.OPCR != 0 { - extFlags |= OPCR - } - if self.RandomAccessIndicator { - extFlags |= 0x40 - } - if self.DiscontinuityIndicator { - extFlags |= 0x80 - } - - if extFlags != 0 { - flags |= EXT - } - - // need padding - if dataLength < 184 { - flags |= EXT - } - - if err = WriteUInt(w, flags, 4); err != nil { - return - } - written += 4 - - if flags&EXT != 0 { - var length uint - - // Discontinuity indicator 1 0x80 - // Random Access indicator 1 0x40 - // Elementary stream priority indicator 1 0x20 - // PCR flag 1 0x10 - // OPCR flag 1 0x08 - - length = 1 // extFlags - if extFlags&PCR != 0 { - length += 6 - } - if extFlags&OPCR != 0 { - length += 6 - } - - paddingLength := 0 - // need padding - if int(length)+5+dataLength < 188 { - paddingLength = 188 - dataLength - 5 - int(length) - length = 188 - uint(dataLength) - 5 - } - - if DebugWriter { - fmt.Fprintf(DebugOutput, "tsw: header padding=%d\n", paddingLength) - } - - if err = WriteUInt(w, length, 1); err != nil { - return - } - if err = WriteUInt(w, extFlags, 1); err != nil { - return - } - - if extFlags&PCR != 0 { - if err = WriteUInt64(w, PCRToUInt(self.PCR), 6); err != nil { - return - } - } - - if extFlags&OPCR != 0 { - if err = WriteUInt64(w, PCRToUInt(self.OPCR), 6); err != nil { - return - } - } - - if paddingLength > 0 { - if err = WriteRepeatVal(w, 0xff, paddingLength); err != nil { - return - } - } - - written += int(length) + 1 - } - - return -} - type TSWriter struct { - W io.Writer - PID uint - TSHeader - DisableHeaderPadding bool - DiscontinuityIndicator bool - - vecw *vecWriter -} - -func (self *TSWriter) EnableVecWriter() { - if self.vecw == nil { - self.vecw = newVecWriter(self.W) - - if DebugWriter && self.vecw != nil { - fmt.Fprintln(DebugOutput, "tsw: enabled vec writer") - } - } -} - -func (self *TSWriter) WriteIovec(data *iovec) (err error) { - if self.vecw != nil { - if err = self.WriteIovecTo(self.vecw, data); err != nil { - return - } - if err = self.vecw.Flush(); err != nil { - return - } - } else { - if err = self.WriteIovecTo(self.W, data); err != nil { - return - } - } - return -} - -func (self *TSWriter) WriteIovecTo(w io.Writer, data *iovec) (err error) { - for i := 0; data.Len > 0; i++ { - header := TSHeader{ - PID: self.PID, - ContinuityCounter: self.ContinuityCounter, - DiscontinuityIndicator: self.DiscontinuityIndicator, - } - - if i == 0 { - header.PayloadUnitStart = true - header.RandomAccessIndicator = self.RandomAccessIndicator - header.PCR = self.PCR - header.OPCR = self.OPCR - } - - requestLength := data.Len - if self.DisableHeaderPadding { - requestLength = 188 - } - var headerLength int - if headerLength, err = WriteTSHeader(w, header, requestLength); err != nil { - return - } - payloadLength := 188 - headerLength - if self.DisableHeaderPadding && data.Len < payloadLength { - data.Append(makeRepeatValBytes(0xff, payloadLength-data.Len)) - } - - if DebugWriter { - fmt.Fprintf(DebugOutput, "tsw: payloadLength=%d dataLength=%d\n", payloadLength, data.Len) - } - - if _, err = data.WriteTo(w, payloadLength); err != nil { - return - } - - self.ContinuityCounter++ - } - return -} - -func (self *TSWriter) WriteTo(w io.Writer, data []byte) (err error) { - iov := &iovec{} - iov.Append(data) - return self.WriteIovecTo(w, iov) -} - -func (self *TSWriter) Write(data []byte) (err error) { - iov := &iovec{} - iov.Append(data) - return self.WriteIovec(iov) + w io.Writer + ContinuityCounter uint + tshdr []byte } func WritePSI(w io.Writer, self PSI, data []byte) (err error) { @@ -312,108 +101,6 @@ func bswap32(v uint) uint { return (v >> 24) | ((v>>16)&0xff)<<8 | ((v>>8)&0xff)<<16 | (v&0xff)<<24 } -func WritePESHeader(w io.Writer, self PESHeader, dataLength int) (err error) { - // http://dvd.sourceforge.net/dvdinfo/pes-hdr.html - - var pts_dts_flags, header_length, packet_length uint - - // start code(24) 000001 - // StreamId(8) - // packet_length(16) - // resverd(6,2)=2,original_or_copy(0,1)=1 - // pts_dts_flags(6,2) - // header_length(8) - // pts(40)? - // dts(40)? - // data - - // start code(24) 000001 - if err = WriteUInt(w, 0x000001, 3); err != nil { - return - } - - // StreamId(8) - if err = WriteUInt(w, self.StreamId, 1); err != nil { - return - } - - const PTS = 1 << 7 - const DTS = 1 << 6 - - if self.PTS != 0 { - pts_dts_flags |= PTS - if self.DTS != 0 { - pts_dts_flags |= DTS - } - } - - if pts_dts_flags&PTS != 0 { - header_length += 5 - } - if pts_dts_flags&DTS != 0 { - header_length += 5 - } - - if dataLength > 0 { - packet_length = uint(dataLength) + header_length + 3 - } - // packet_length(16) if zero then variable length - // Specifies the number of bytes remaining in the packet after this field. Can be zero. - // If the PES packet length is set to zero, the PES packet can be of any length. - // A value of zero for the PES packet length can be used only when the PES packet payload is a video elementary stream. - if err = WriteUInt(w, packet_length, 2); err != nil { - return - } - - // resverd(6,2)=2,original_or_copy(0,1)=1 - if err = WriteUInt(w, 2<<6|1, 1); err != nil { - return - } - - // pts_dts_flags(6,2) - if err = WriteUInt(w, pts_dts_flags, 1); err != nil { - return - } - - // header_length(8) - if err = WriteUInt(w, header_length, 1); err != nil { - return - } - - // pts(40)? - // dts(40)? - if pts_dts_flags&PTS != 0 { - if pts_dts_flags&DTS != 0 { - if err = WriteUInt64(w, PESTsToUInt(self.PTS)|3<<36, 5); err != nil { - return - } - if err = WriteUInt64(w, PESTsToUInt(self.DTS)|1<<36, 5); err != nil { - return - } - } else { - if err = WriteUInt64(w, PESTsToUInt(self.PTS)|2<<36, 5); err != nil { - return - } - } - } - - return -} - -func WritePESPacket(w *TSWriter, header PESHeader, data []byte) (err error) { - bw := &bytes.Buffer{} - if err = WritePESHeader(bw, header, len(data)); err != nil { - return - } - iov := &iovec{} - iov.Append(bw.Bytes()) - iov.Append(data) - if err = w.WriteIovec(iov); err != nil { - return - } - return -} - func WritePAT(w io.Writer, self PAT) (err error) { bw := &bytes.Buffer{} @@ -442,22 +129,6 @@ func WritePAT(w io.Writer, self PAT) (err error) { return } -func WritePATPacket(w io.Writer, pat PAT) (err error) { - tsw := &TSWriter{ - W: w, - PID: 0, - DisableHeaderPadding: true, - } - bw := &bytes.Buffer{} - if err = WritePAT(bw, pat); err != nil { - return - } - if err = tsw.Write(bw.Bytes()); err != nil { - return - } - return -} - func WritePMT(w io.Writer, self PMT) (err error) { writeDescs := func(w io.Writer, descs []Descriptor) (err error) { for _, desc := range descs { @@ -537,20 +208,3 @@ func WritePMT(w io.Writer, self PMT) (err error) { return } -func WritePMTPacket(w io.Writer, pmt PMT, pid uint) (err error) { - tsw := &TSWriter{ - W: w, - PID: pid, - DisableHeaderPadding: true, - } - bw := &bytes.Buffer{} - if err = WritePMT(bw, pmt); err != nil { - return - } - if err = tsw.Write(bw.Bytes()); err != nil { - return - } - return -} - -