use zero copy IO when writing PES packets

This commit is contained in:
nareix 2015-12-09 11:48:55 +08:00
parent 910337cf39
commit 36825b6232
3 changed files with 175 additions and 137 deletions

View File

@ -8,9 +8,23 @@ import (
ts "../" ts "../"
"fmt" "fmt"
"encoding/hex" "encoding/hex"
"encoding/gob"
"flag" "flag"
) )
type GobAllSamples struct {
TimeScale int
SPS []byte
PPS []byte
Samples []GobSample
}
type GobSample struct {
Duration int
Data []byte
Sync bool
}
type Stream struct { type Stream struct {
PID uint PID uint
PESHeader *ts.PESHeader PESHeader *ts.PESHeader
@ -145,11 +159,39 @@ func readSamples(filename string, ch chan Sample) {
} }
} }
func testInputGob(pathGob string, pathOut string) {
gobfile, _ := os.Open(pathGob)
outfile, _ := os.Create(pathOut)
dec := gob.NewDecoder(gobfile)
var allSamples GobAllSamples
dec.Decode(&allSamples)
w := ts.SimpleH264Writer{
W: outfile,
SPS: allSamples.SPS,
PPS: allSamples.PPS,
TimeScale: allSamples.TimeScale,
}
for _, sample := range allSamples.Samples {
w.WriteNALU(sample.Sync, sample.Duration, sample.Data)
}
outfile.Close()
fmt.Println("written to", pathOut)
}
func main() { func main() {
input := flag.String("i", "", "input file") input := flag.String("i", "", "input file")
output := flag.String("o", "", "output file") output := flag.String("o", "", "output file")
inputGob := flag.String("g", "", "input gob file")
flag.Parse() flag.Parse()
if *inputGob != "" && *output != "" {
testInputGob(*inputGob, *output)
return
}
var file *os.File var file *os.File
var err error var err error
@ -163,43 +205,25 @@ func main() {
} }
writePAT := func() (err error) { writePAT := func() (err error) {
w := &ts.TSWriter{
W: file,
PID: 0,
DisableHeaderPadding: true,
}
pat := ts.PAT{ pat := ts.PAT{
Entries: []ts.PATEntry{ Entries: []ts.PATEntry{
{ProgramNumber: 1, ProgramMapPID: 0x1000}, {ProgramNumber: 1, ProgramMapPID: 0x1000},
}, },
} }
bw := &bytes.Buffer{} if err = ts.WritePATPacket(file, pat); err != nil {
if err = ts.WritePAT(bw, pat); err != nil {
return
}
if err = w.Write(bw.Bytes(), false); err != nil {
return return
} }
return return
} }
writePMT := func() (err error) { writePMT := func() (err error) {
w := &ts.TSWriter{
W: file,
PID: 0x1000,
DisableHeaderPadding: true,
}
pmt := ts.PMT{ pmt := ts.PMT{
PCRPID: 0x100, PCRPID: 0x100,
ElementaryStreamInfos: []ts.ElementaryStreamInfo{ ElementaryStreamInfos: []ts.ElementaryStreamInfo{
{StreamType: ts.ElementaryStreamTypeH264, ElementaryPID: 0x100}, {StreamType: ts.ElementaryStreamTypeH264, ElementaryPID: 0x100},
}, },
} }
bw := &bytes.Buffer{} if err = ts.WritePMTPacket(file, pmt, 0x1000); err != nil {
if err = ts.WritePMT(bw, pmt); err != nil {
return
}
if err = w.Write(bw.Bytes(), false); err != nil {
return return
} }
return return
@ -214,11 +238,8 @@ func main() {
DTS: sample.DTS, DTS: sample.DTS,
} }
w.PCR = sample.PCR w.PCR = sample.PCR
bw := &bytes.Buffer{} w.RandomAccessIndicator = sample.RandomAccessIndicator
if err = ts.WritePES(bw, pes, bytes.NewReader(sample.Data)); err != nil { if err = ts.WritePESPacket(w, pes, sample.Data); err != nil {
return
}
if err = w.Write(bw.Bytes(), sample.RandomAccessIndicator); err != nil {
return return
} }
return return

View File

@ -5,36 +5,41 @@ import (
"io" "io"
) )
func getSeekerLength(data io.Seeker) (length int64) { type iovec struct {
length, _ = data.Seek(0, 2) data [][]byte
data.Seek(0, 0) Len int
return
} }
type multiReadSeeker struct { func (self *iovec) Append(b []byte) {
readers []io.ReadSeeker self.data = append(self.data, b)
self.Len += len(b)
} }
func (mr *multiReadSeeker) Seek(offset int64, whence int) (n int64, err error) { func (self *iovec) WriteTo(w io.Writer, n int) (written int, err error) {
if whence == 2 { for n > 0 && self.Len > 0 {
for _, reader := range mr.readers { data := self.data[0]
n += getSeekerLength(reader)
var b []byte
if n > len(data) {
b = data
} else {
b = data[:n]
} }
}
return
}
func (mr *multiReadSeeker) Read(p []byte) (n int, err error) { data = data[len(b):]
for len(mr.readers) > 0 { if len(data) == 0 {
n, err = mr.readers[0].Read(p) self.data = self.data[1:]
if n > 0 || err != io.EOF { } else {
if err == io.EOF { self.data[0] = data
err = nil }
} self.Len -= len(b)
n -= len(b)
written += len(b)
if _, err = w.Write(b); err != nil {
return return
} }
mr.readers = mr.readers[1:]
} }
return 0, io.EOF return
} }

192
writer.go
View File

@ -7,7 +7,7 @@ import (
"bytes" "bytes"
) )
const DebugWriter = false const DebugWriter = true
func WriteUInt64(w io.Writer, val uint64, n int) (err error) { func WriteUInt64(w io.Writer, val uint64, n int) (err error) {
var b [8]byte var b [8]byte
@ -38,7 +38,7 @@ func WriteRepeatVal(w io.Writer, val byte, n int) (err error) {
return return
} }
func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) { func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (written int, err error) {
var flags, extFlags uint var flags, extFlags uint
// sync(8) // sync(8)
@ -85,6 +85,7 @@ func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) {
if err = WriteUInt(w, flags, 4); err != nil { if err = WriteUInt(w, flags, 4); err != nil {
return return
} }
written += 4
if flags & EXT != 0 { if flags & EXT != 0 {
var length uint var length uint
@ -111,7 +112,7 @@ func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) {
} }
if DebugWriter { if DebugWriter {
fmt.Printf("tsw: dataLength=%d paddingLength=%d\n", dataLength, paddingLength) fmt.Printf("tsw: header padding=%d\n", paddingLength)
} }
if err = WriteUInt(w, length, 1); err != nil { if err = WriteUInt(w, length, 1); err != nil {
@ -138,6 +139,8 @@ func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) {
return return
} }
} }
written += int(length)+1
} }
return return
@ -146,14 +149,12 @@ func WriteTSHeader(w io.Writer, self TSHeader, dataLength int) (err error) {
type TSWriter struct { type TSWriter struct {
W io.Writer W io.Writer
PID uint PID uint
PCR uint64 TSHeader
OPCR uint64
ContinuityCounter uint
DisableHeaderPadding bool DisableHeaderPadding bool
} }
func (self *TSWriter) Write(b []byte, RandomAccessIndicator bool) (err error) { func (self *TSWriter) WriteIovec(data *iovec) (err error) {
for i := 0; len(b) > 0; i++ { for i := 0; data.Len > 0; i++ {
header := TSHeader{ header := TSHeader{
PID: self.PID, PID: self.PID,
ContinuityCounter: self.ContinuityCounter, ContinuityCounter: self.ContinuityCounter,
@ -161,37 +162,29 @@ func (self *TSWriter) Write(b []byte, RandomAccessIndicator bool) (err error) {
if i == 0 { if i == 0 {
header.PayloadUnitStart = true header.PayloadUnitStart = true
header.RandomAccessIndicator = self.RandomAccessIndicator
header.PCR = self.PCR header.PCR = self.PCR
header.OPCR = self.OPCR header.OPCR = self.OPCR
header.RandomAccessIndicator = RandomAccessIndicator
} }
requestLength := len(b) requestLength := data.Len
if self.DisableHeaderPadding { if self.DisableHeaderPadding {
requestLength = 188 requestLength = 188
} }
var headerLength int
bw := &bytes.Buffer{} if headerLength, err = WriteTSHeader(self.W, header, requestLength); err != nil {
if err = WriteTSHeader(bw, header, requestLength); err != nil {
return return
} }
payloadLength := 188 - headerLength
dataLen := 188-bw.Len() if self.DisableHeaderPadding && data.Len < payloadLength {
if self.DisableHeaderPadding && len(b) < dataLen { data.Append(makeRepeatValBytes(0xff, payloadLength - data.Len))
b = append(b, makeRepeatValBytes(0xff, dataLen - len(b))...)
} }
data := b[:dataLen]
b = b[dataLen:]
if DebugWriter { if DebugWriter {
fmt.Printf("tsw: datalen=%d blen=%d\n", dataLen, len(b)) fmt.Printf("tsw: payloadLength=%d dataLength=%d\n", payloadLength, data.Len)
} }
if _, err = self.W.Write(bw.Bytes()); err != nil { if _, err = data.WriteTo(self.W, payloadLength); err != nil {
return
}
if _, err = self.W.Write(data); err != nil {
return return
} }
@ -201,6 +194,12 @@ func (self *TSWriter) Write(b []byte, RandomAccessIndicator bool) (err error) {
return return
} }
func (self *TSWriter) Write(data []byte) (err error) {
iov := &iovec{}
iov.Append(data)
return self.WriteIovec(iov)
}
func WritePSI(w io.Writer, self PSI, data []byte) (err error) { func WritePSI(w io.Writer, self PSI, data []byte) (err error) {
// pointer(8) // pointer(8)
// table_id(8) // table_id(8)
@ -274,7 +273,7 @@ func bswap32(v uint) uint {
return (v>>24)|((v>>16)&0xff)<<8|((v>>8)&0xff)<<16|(v&0xff)<<24 return (v>>24)|((v>>16)&0xff)<<8|((v>>8)&0xff)<<16|(v&0xff)<<24
} }
func WritePES(w io.Writer, self PESHeader, data io.ReadSeeker) (err error) { func WritePESHeader(w io.Writer, self PESHeader) (err error) {
// http://dvd.sourceforge.net/dvdinfo/pes-hdr.html // http://dvd.sourceforge.net/dvdinfo/pes-hdr.html
var pts_dts_flags, header_length uint var pts_dts_flags, header_length uint
@ -353,11 +352,20 @@ func WritePES(w io.Writer, self PESHeader, data io.ReadSeeker) (err error) {
} }
} }
// data return
if _, err = io.Copy(w, data); err != nil { }
func WritePESPacket(w *TSWriter, header PESHeader, data []byte) (err error) {
bw := &bytes.Buffer{}
if err = WritePESHeader(bw, header); err != nil {
return
}
iov := &iovec{}
iov.Append(bw.Bytes())
iov.Append(data)
if err = w.WriteIovec(iov); err != nil {
return return
} }
return return
} }
@ -389,6 +397,22 @@ func WritePAT(w io.Writer, self PAT) (err error) {
return return
} }
func WritePATPacket(w io.Writer, pat PAT) (err error) {
tsw := &TSWriter{
W: w,
PID: 0,
DisableHeaderPadding: true,
}
bw := &bytes.Buffer{}
if err = WritePAT(bw, pat); err != nil {
return
}
if err = tsw.Write(bw.Bytes()); err != nil {
return
}
return
}
func WritePMT(w io.Writer, self PMT) (err error) { func WritePMT(w io.Writer, self PMT) (err error) {
writeDescs := func(w io.Writer, descs []Descriptor) (err error) { writeDescs := func(w io.Writer, descs []Descriptor) (err error) {
for _, desc := range descs { for _, desc := range descs {
@ -468,6 +492,22 @@ func WritePMT(w io.Writer, self PMT) (err error) {
return return
} }
func WritePMTPacket(w io.Writer, pmt PMT, pid uint) (err error) {
tsw := &TSWriter{
W: w,
PID: pid,
DisableHeaderPadding: true,
}
bw := &bytes.Buffer{}
if err = WritePMT(bw, pmt); err != nil {
return
}
if err = tsw.Write(bw.Bytes()); err != nil {
return
}
return
}
type SimpleH264Writer struct { type SimpleH264Writer struct {
W io.Writer W io.Writer
TimeScale int TimeScale int
@ -479,57 +519,26 @@ type SimpleH264Writer struct {
pts uint64 pts uint64
pcr uint64 pcr uint64
prepared bool prepared bool
pesBuf *bytes.Buffer
} }
func (self *SimpleH264Writer) prepare() (err error) { func (self *SimpleH264Writer) prepare() (err error) {
writePAT := func() (err error) { pat := PAT{
w := &TSWriter{ Entries: []PATEntry{
W: self.W, {ProgramNumber: 1, ProgramMapPID: 0x1000},
PID: 0, },
DisableHeaderPadding: true, }
} if err = WritePATPacket(self.W, pat); err != nil {
pat := PAT{
Entries: []PATEntry{
{ProgramNumber: 1, ProgramMapPID: 0x1000},
},
}
bw := &bytes.Buffer{}
if err = WritePAT(bw, pat); err != nil {
return
}
if err = w.Write(bw.Bytes(), false); err != nil {
return
}
return return
} }
writePMT := func() (err error) { pmt := PMT{
w := &TSWriter{ PCRPID: 0x100,
W: self.W, ElementaryStreamInfos: []ElementaryStreamInfo{
PID: 0x1000, {StreamType: ElementaryStreamTypeH264, ElementaryPID: 0x100},
DisableHeaderPadding: true, },
}
pmt := PMT{
PCRPID: 0x100,
ElementaryStreamInfos: []ElementaryStreamInfo{
{StreamType: ElementaryStreamTypeH264, ElementaryPID: 0x100},
},
}
bw := &bytes.Buffer{}
if err = WritePMT(bw, pmt); err != nil {
return
}
if err = w.Write(bw.Bytes(), false); err != nil {
return
}
return
} }
if err = WritePMTPacket(self.W, pmt, 0x1000); err != nil {
if err = writePAT(); err != nil {
return
}
if err = writePMT(); err != nil {
return return
} }
@ -540,6 +549,8 @@ func (self *SimpleH264Writer) prepare() (err error) {
self.pts = PTS_HZ self.pts = PTS_HZ
self.pcr = PCR_HZ self.pcr = PCR_HZ
self.pesBuf = &bytes.Buffer{}
return return
} }
@ -554,10 +565,18 @@ func (self *SimpleH264Writer) WriteNALU(sync bool, duration int, nalu []byte) (e
nalus = append(nalus, self.SPS) nalus = append(nalus, self.SPS)
nalus = append(nalus, self.PPS) nalus = append(nalus, self.PPS)
} }
nalus = append(nalus, nalu) nalus = append(nalus, nalu)
readers := []io.ReadSeeker{} pes := PESHeader{
StreamId: StreamIdH264,
PTS: self.pts,
}
if err = WritePESHeader(self.pesBuf, pes); err != nil {
return
}
data := &iovec{}
data.Append(self.pesBuf.Bytes())
for i, nalu := range nalus { for i, nalu := range nalus {
var startCode []byte var startCode []byte
if i == 0 { if i == 0 {
@ -565,27 +584,20 @@ func (self *SimpleH264Writer) WriteNALU(sync bool, duration int, nalu []byte) (e
} else { } else {
startCode = []byte{0,0,1} startCode = []byte{0,0,1}
} }
readers = append(readers, bytes.NewReader(startCode)) data.Append(startCode)
readers = append(readers, bytes.NewReader(nalu)) data.Append(nalu)
} }
data := &multiReadSeeker{readers: readers}
pes := PESHeader{ self.tsw.RandomAccessIndicator = sync
StreamId: StreamIdH264,
PTS: self.pts,
}
self.tsw.PCR = self.pcr self.tsw.PCR = self.pcr
if err = self.tsw.WriteIovec(data); err != nil {
return
}
self.pts += uint64(duration)*PTS_HZ/uint64(self.TimeScale) self.pts += uint64(duration)*PTS_HZ/uint64(self.TimeScale)
self.pcr += uint64(duration)*PCR_HZ/uint64(self.TimeScale) self.pcr += uint64(duration)*PCR_HZ/uint64(self.TimeScale)
self.pesBuf.Reset()
bw := &bytes.Buffer{}
if err = WritePES(bw, pes, data); err != nil {
return
}
if err = self.tsw.Write(bw.Bytes(), sync); err != nil {
return
}
return return
} }