From 36825b62324faea6f61d5b9ab7b9ada88b00270a Mon Sep 17 00:00:00 2001 From: nareix Date: Wed, 9 Dec 2015 11:48:55 +0800 Subject: [PATCH] use zero copy IO when writing PES packets --- example/test.go | 71 +++++++++++------- vecio.go | 49 ++++++------ writer.go | 192 +++++++++++++++++++++++++----------------------- 3 files changed, 175 insertions(+), 137 deletions(-) diff --git a/example/test.go b/example/test.go index 5f93a95..dbcc347 100644 --- a/example/test.go +++ b/example/test.go @@ -8,9 +8,23 @@ import ( ts "../" "fmt" "encoding/hex" + "encoding/gob" "flag" ) +type GobAllSamples struct { + TimeScale int + SPS []byte + PPS []byte + Samples []GobSample +} + +type GobSample struct { + Duration int + Data []byte + Sync bool +} + type Stream struct { PID uint PESHeader *ts.PESHeader @@ -145,11 +159,39 @@ func readSamples(filename string, ch chan Sample) { } } +func testInputGob(pathGob string, pathOut string) { + gobfile, _ := os.Open(pathGob) + outfile, _ := os.Create(pathOut) + dec := gob.NewDecoder(gobfile) + var allSamples GobAllSamples + dec.Decode(&allSamples) + + w := ts.SimpleH264Writer{ + W: outfile, + SPS: allSamples.SPS, + PPS: allSamples.PPS, + TimeScale: allSamples.TimeScale, + } + + for _, sample := range allSamples.Samples { + w.WriteNALU(sample.Sync, sample.Duration, sample.Data) + } + + outfile.Close() + fmt.Println("written to", pathOut) +} + func main() { input := flag.String("i", "", "input file") output := flag.String("o", "", "output file") + inputGob := flag.String("g", "", "input gob file") flag.Parse() + if *inputGob != "" && *output != "" { + testInputGob(*inputGob, *output) + return + } + var file *os.File var err error @@ -163,43 +205,25 @@ func main() { } writePAT := func() (err error) { - w := &ts.TSWriter{ - W: file, - PID: 0, - DisableHeaderPadding: true, - } pat := ts.PAT{ Entries: []ts.PATEntry{ {ProgramNumber: 1, ProgramMapPID: 0x1000}, }, } - bw := &bytes.Buffer{} - if err = ts.WritePAT(bw, pat); err != nil { - return - } - if err = w.Write(bw.Bytes(), false); err != nil { + if err = ts.WritePATPacket(file, pat); err != nil { return } return } writePMT := func() (err error) { - w := &ts.TSWriter{ - W: file, - PID: 0x1000, - DisableHeaderPadding: true, - } pmt := ts.PMT{ PCRPID: 0x100, ElementaryStreamInfos: []ts.ElementaryStreamInfo{ {StreamType: ts.ElementaryStreamTypeH264, ElementaryPID: 0x100}, }, } - bw := &bytes.Buffer{} - if err = ts.WritePMT(bw, pmt); err != nil { - return - } - if err = w.Write(bw.Bytes(), false); err != nil { + if err = ts.WritePMTPacket(file, pmt, 0x1000); err != nil { return } return @@ -214,11 +238,8 @@ func main() { DTS: sample.DTS, } w.PCR = sample.PCR - bw := &bytes.Buffer{} - if err = ts.WritePES(bw, pes, bytes.NewReader(sample.Data)); err != nil { - return - } - if err = w.Write(bw.Bytes(), sample.RandomAccessIndicator); err != nil { + w.RandomAccessIndicator = sample.RandomAccessIndicator + if err = ts.WritePESPacket(w, pes, sample.Data); err != nil { return } return diff --git a/vecio.go b/vecio.go index fe5857b..57bdd92 100644 --- a/vecio.go +++ b/vecio.go @@ -5,36 +5,41 @@ import ( "io" ) -func getSeekerLength(data io.Seeker) (length int64) { - length, _ = data.Seek(0, 2) - data.Seek(0, 0) - return +type iovec struct { + data [][]byte + Len int } -type multiReadSeeker struct { - readers []io.ReadSeeker +func (self *iovec) Append(b []byte) { + self.data = append(self.data, b) + self.Len += len(b) } -func (mr *multiReadSeeker) Seek(offset int64, whence int) (n int64, err error) { - if whence == 2 { - for _, reader := range mr.readers { - n += getSeekerLength(reader) +func (self *iovec) WriteTo(w io.Writer, n int) (written int, err error) { + for n > 0 && self.Len > 0 { + data := self.data[0] + + var b []byte + if n > len(data) { + b = data + } else { + b = data[:n] } - } - return -} -func (mr *multiReadSeeker) Read(p []byte) (n int, err error) { - for len(mr.readers) > 0 { - n, err = mr.readers[0].Read(p) - if n > 0 || err != io.EOF { - if err == io.EOF { - err = nil - } + data = data[len(b):] + if len(data) == 0 { + self.data = self.data[1:] + } else { + self.data[0] = data + } + self.Len -= len(b) + n -= len(b) + written += len(b) + + if _, err = w.Write(b); err != nil { return } - mr.readers = mr.readers[1:] } - return 0, io.EOF + return } diff --git a/writer.go b/writer.go index 445f5b3..14b1be2 100644 --- a/writer.go +++ b/writer.go @@ -7,7 +7,7 @@ import ( "bytes" ) -const DebugWriter = false +const DebugWriter = true func WriteUInt64(w io.Writer, val uint64, n int) (err error) { var b [8]byte @@ -38,7 +38,7 @@ func WriteRepeatVal(w io.Writer, val byte, n int) (err error) { return } -func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) { +func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (written int, err error) { var flags, extFlags uint // sync(8) @@ -85,6 +85,7 @@ func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) { if err = WriteUInt(w, flags, 4); err != nil { return } + written += 4 if flags & EXT != 0 { var length uint @@ -111,7 +112,7 @@ func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) { } if DebugWriter { - fmt.Printf("tsw: dataLength=%d paddingLength=%d\n", dataLength, paddingLength) + fmt.Printf("tsw: header padding=%d\n", paddingLength) } if err = WriteUInt(w, length, 1); err != nil { @@ -138,6 +139,8 @@ func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) { return } } + + written += int(length)+1 } return @@ -146,14 +149,12 @@ func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) { type TSWriter struct { W io.Writer PID uint - PCR uint64 - OPCR uint64 - ContinuityCounter uint + TSHeader DisableHeaderPadding bool } -func (self *TSWriter) Write(b []byte, RandomAccessIndicator bool) (err error) { - for i := 0; len(b) > 0; i++ { +func (self *TSWriter) WriteIovec(data *iovec) (err error) { + for i := 0; data.Len > 0; i++ { header := TSHeader{ PID: self.PID, ContinuityCounter: self.ContinuityCounter, @@ -161,37 +162,29 @@ func (self *TSWriter) Write(b []byte, RandomAccessIndicator bool) (err error) { if i == 0 { header.PayloadUnitStart = true + header.RandomAccessIndicator = self.RandomAccessIndicator header.PCR = self.PCR header.OPCR = self.OPCR - header.RandomAccessIndicator = RandomAccessIndicator } - requestLength := len(b) + requestLength := data.Len if self.DisableHeaderPadding { requestLength = 188 } - - bw := &bytes.Buffer{} - if err = WriteTSHeader(bw, header, requestLength); err != nil { + var headerLength int + if headerLength, err = WriteTSHeader(self.W, header, requestLength); err != nil { return } - - dataLen := 188-bw.Len() - if self.DisableHeaderPadding && len(b) < dataLen { - b = append(b, makeRepeatValBytes(0xff, dataLen - len(b))...) + payloadLength := 188 - headerLength + if self.DisableHeaderPadding && data.Len < payloadLength { + data.Append(makeRepeatValBytes(0xff, payloadLength - data.Len)) } - data := b[:dataLen] - b = b[dataLen:] - if DebugWriter { - fmt.Printf("tsw: datalen=%d blen=%d\n", dataLen, len(b)) + fmt.Printf("tsw: payloadLength=%d dataLength=%d\n", payloadLength, data.Len) } - if _, err = self.W.Write(bw.Bytes()); err != nil { - return - } - if _, err = self.W.Write(data); err != nil { + if _, err = data.WriteTo(self.W, payloadLength); err != nil { return } @@ -201,6 +194,12 @@ func (self *TSWriter) Write(b []byte, RandomAccessIndicator bool) (err error) { return } +func (self *TSWriter) Write(data []byte) (err error) { + iov := &iovec{} + iov.Append(data) + return self.WriteIovec(iov) +} + func WritePSI(w io.Writer, self PSI, data []byte) (err error) { // pointer(8) // table_id(8) @@ -274,7 +273,7 @@ func bswap32(v uint) uint { return (v>>24)|((v>>16)&0xff)<<8|((v>>8)&0xff)<<16|(v&0xff)<<24 } -func WritePES(w io.Writer, self PESHeader, data io.ReadSeeker) (err error) { +func WritePESHeader(w io.Writer, self PESHeader) (err error) { // http://dvd.sourceforge.net/dvdinfo/pes-hdr.html var pts_dts_flags, header_length uint @@ -353,11 +352,20 @@ func WritePES(w io.Writer, self PESHeader, data io.ReadSeeker) (err error) { } } - // data - if _, err = io.Copy(w, data); err != nil { + return +} + +func WritePESPacket(w *TSWriter, header PESHeader, data []byte) (err error) { + bw := &bytes.Buffer{} + if err = WritePESHeader(bw, header); err != nil { + return + } + iov := &iovec{} + iov.Append(bw.Bytes()) + iov.Append(data) + if err = w.WriteIovec(iov); err != nil { return } - return } @@ -389,6 +397,22 @@ func WritePAT(w io.Writer, self PAT) (err error) { return } +func WritePATPacket(w io.Writer, pat PAT) (err error) { + tsw := &TSWriter{ + W: w, + PID: 0, + DisableHeaderPadding: true, + } + bw := &bytes.Buffer{} + if err = WritePAT(bw, pat); err != nil { + return + } + if err = tsw.Write(bw.Bytes()); err != nil { + return + } + return +} + func WritePMT(w io.Writer, self PMT) (err error) { writeDescs := func(w io.Writer, descs []Descriptor) (err error) { for _, desc := range descs { @@ -468,6 +492,22 @@ func WritePMT(w io.Writer, self PMT) (err error) { return } +func WritePMTPacket(w io.Writer, pmt PMT, pid uint) (err error) { + tsw := &TSWriter{ + W: w, + PID: pid, + DisableHeaderPadding: true, + } + bw := &bytes.Buffer{} + if err = WritePMT(bw, pmt); err != nil { + return + } + if err = tsw.Write(bw.Bytes()); err != nil { + return + } + return +} + type SimpleH264Writer struct { W io.Writer TimeScale int @@ -479,57 +519,26 @@ type SimpleH264Writer struct { pts uint64 pcr uint64 prepared bool + pesBuf *bytes.Buffer } func (self *SimpleH264Writer) prepare() (err error) { - writePAT := func() (err error) { - w := &TSWriter{ - W: self.W, - PID: 0, - DisableHeaderPadding: true, - } - pat := PAT{ - Entries: []PATEntry{ - {ProgramNumber: 1, ProgramMapPID: 0x1000}, - }, - } - bw := &bytes.Buffer{} - if err = WritePAT(bw, pat); err != nil { - return - } - if err = w.Write(bw.Bytes(), false); err != nil { - return - } + pat := PAT{ + Entries: []PATEntry{ + {ProgramNumber: 1, ProgramMapPID: 0x1000}, + }, + } + if err = WritePATPacket(self.W, pat); err != nil { return } - writePMT := func() (err error) { - w := &TSWriter{ - W: self.W, - PID: 0x1000, - DisableHeaderPadding: true, - } - pmt := PMT{ - PCRPID: 0x100, - ElementaryStreamInfos: []ElementaryStreamInfo{ - {StreamType: ElementaryStreamTypeH264, ElementaryPID: 0x100}, - }, - } - bw := &bytes.Buffer{} - if err = WritePMT(bw, pmt); err != nil { - return - } - if err = w.Write(bw.Bytes(), false); err != nil { - return - } - return + pmt := PMT{ + PCRPID: 0x100, + ElementaryStreamInfos: []ElementaryStreamInfo{ + {StreamType: ElementaryStreamTypeH264, ElementaryPID: 0x100}, + }, } - - if err = writePAT(); err != nil { - return - } - - if err = writePMT(); err != nil { + if err = WritePMTPacket(self.W, pmt, 0x1000); err != nil { return } @@ -540,6 +549,8 @@ func (self *SimpleH264Writer) prepare() (err error) { self.pts = PTS_HZ self.pcr = PCR_HZ + self.pesBuf = &bytes.Buffer{} + return } @@ -554,10 +565,18 @@ func (self *SimpleH264Writer) WriteNALU(sync bool, duration int, nalu []byte) (e nalus = append(nalus, self.SPS) nalus = append(nalus, self.PPS) } - nalus = append(nalus, nalu) - readers := []io.ReadSeeker{} + pes := PESHeader{ + StreamId: StreamIdH264, + PTS: self.pts, + } + if err = WritePESHeader(self.pesBuf, pes); err != nil { + return + } + + data := &iovec{} + data.Append(self.pesBuf.Bytes()) for i, nalu := range nalus { var startCode []byte if i == 0 { @@ -565,27 +584,20 @@ func (self *SimpleH264Writer) WriteNALU(sync bool, duration int, nalu []byte) (e } else { startCode = []byte{0,0,1} } - readers = append(readers, bytes.NewReader(startCode)) - readers = append(readers, bytes.NewReader(nalu)) + data.Append(startCode) + data.Append(nalu) } - data := &multiReadSeeker{readers: readers} - pes := PESHeader{ - StreamId: StreamIdH264, - PTS: self.pts, - } + self.tsw.RandomAccessIndicator = sync self.tsw.PCR = self.pcr + if err = self.tsw.WriteIovec(data); err != nil { + return + } + self.pts += uint64(duration)*PTS_HZ/uint64(self.TimeScale) self.pcr += uint64(duration)*PCR_HZ/uint64(self.TimeScale) - - bw := &bytes.Buffer{} - if err = WritePES(bw, pes, data); err != nil { - return - } - if err = self.tsw.Write(bw.Bytes(), sync); err != nil { - return - } + self.pesBuf.Reset() return }