From 8cf764967d04e444422ca6b73a5555d21498a2a5 Mon Sep 17 00:00:00 2001 From: nareix Date: Wed, 9 Dec 2015 18:02:33 +0800 Subject: [PATCH] add vecWriter --- example/test.go | 1 + vecio.go | 93 +++++++++++++++++++++++++++++++++++++++++++++++-- writer.go | 30 ++++++++++++++-- 3 files changed, 118 insertions(+), 6 deletions(-) diff --git a/example/test.go b/example/test.go index a508053..36c0016 100644 --- a/example/test.go +++ b/example/test.go @@ -283,6 +283,7 @@ func main() { W: file, PID: 0x100, } + w.EnableVecWriter() } for { diff --git a/vecio.go b/vecio.go index 57bdd92..8256d03 100644 --- a/vecio.go +++ b/vecio.go @@ -3,11 +3,18 @@ package ts import ( "io" + "os" + "net" + "fmt" + "unsafe" + "syscall" ) type iovec struct { data [][]byte Len int + pos int + idx int } func (self *iovec) Append(b []byte) { @@ -17,7 +24,7 @@ func (self *iovec) Append(b []byte) { func (self *iovec) WriteTo(w io.Writer, n int) (written int, err error) { for n > 0 && self.Len > 0 { - data := self.data[0] + data := self.data[self.idx] var b []byte if n > len(data) { @@ -28,9 +35,9 @@ func (self *iovec) WriteTo(w io.Writer, n int) (written int, err error) { data = data[len(b):] if len(data) == 0 { - self.data = self.data[1:] + self.idx++ } else { - self.data[0] = data + self.data[self.idx] = data } self.Len -= len(b) n -= len(b) @@ -43,3 +50,83 @@ func (self *iovec) WriteTo(w io.Writer, n int) (written int, err error) { return } +type sysiovec struct { + Base uintptr + Len uint64 +} + +type vecWriter struct { + fd uintptr + smallBytesBuf []byte + iov []sysiovec +} + +func (self *vecWriter) Write(p []byte) (written int, err error) { + iov := sysiovec{ + Len: uint64(len(p)), + } + + if len(p) < 16 { + iov.Base = uintptr(len(self.smallBytesBuf)) + self.smallBytesBuf = append(self.smallBytesBuf, p...) + } else { + iov.Base = uintptr(unsafe.Pointer(&p[0])) + } + + self.iov = append(self.iov, iov) + return +} + +func (self *vecWriter) Flush() (err error) { + for i := range self.iov { + iov := &self.iov[i] + if iov.Base < uintptr(len(self.smallBytesBuf)) { + iov.Base = uintptr(unsafe.Pointer(&self.smallBytesBuf[iov.Base])) + } + } + + N := 1024 + for i := 0; i < len(self.iov); i += N { + n := len(self.iov) - i + if n > N { + n = N + } + _, _, errno := syscall.Syscall(syscall.SYS_WRITEV, self.fd, uintptr(unsafe.Pointer(&self.iov[i])), uintptr(n)) + if errno != 0 { + err = fmt.Errorf("writev failed with error: %d", errno) + return + } + } + + if DebugWriter { + fmt.Printf("vecw: smallBytesBuf=%d iovNr=%d\n", len(self.smallBytesBuf), len(self.iov)) + } + + self.iov = self.iov[:0] + self.smallBytesBuf = self.smallBytesBuf[:0] + + return +} + +func newVecWriter(w io.Writer) (vecw *vecWriter) { + var err error + var f *os.File + + switch obj := w.(type) { + case *net.TCPConn: + f, err = obj.File() + if err != nil { + return + } + case *os.File: + f = obj + default: + return + } + + vecw = &vecWriter{ + fd: f.Fd(), + } + return +} + diff --git a/writer.go b/writer.go index b5e9f04..dfe7d40 100644 --- a/writer.go +++ b/writer.go @@ -151,9 +151,26 @@ type TSWriter struct { PID uint TSHeader DisableHeaderPadding bool + + vecw *vecWriter +} + +func (self *TSWriter) EnableVecWriter() { + if self.vecw == nil { + self.vecw = newVecWriter(self.W) + + if DebugWriter && self.vecw != nil { + fmt.Println("tsw: enabled vec writer") + } + } } func (self *TSWriter) WriteIovec(data *iovec) (err error) { + w := self.W + if self.vecw != nil { + w = self.vecw + } + for i := 0; data.Len > 0; i++ { header := TSHeader{ PID: self.PID, @@ -172,7 +189,7 @@ func (self *TSWriter) WriteIovec(data *iovec) (err error) { requestLength = 188 } var headerLength int - if headerLength, err = WriteTSHeader(self.W, header, requestLength); err != nil { + if headerLength, err = WriteTSHeader(w, header, requestLength); err != nil { return } payloadLength := 188 - headerLength @@ -184,13 +201,19 @@ func (self *TSWriter) WriteIovec(data *iovec) (err error) { fmt.Printf("tsw: payloadLength=%d dataLength=%d\n", payloadLength, data.Len) } - if _, err = data.WriteTo(self.W, payloadLength); err != nil { + if _, err = data.WriteTo(w, payloadLength); err != nil { return } self.ContinuityCounter++ } + if self.vecw != nil { + if err = self.vecw.Flush(); err != nil { + return + } + } + return } @@ -232,7 +255,7 @@ func WritePSI(w io.Writer, self PSI, data []byte) (err error) { } if DebugWriter { - fmt.Printf("wpsi: length=%d\n", length) + fmt.Printf("psiw: length=%d\n", length) } // Table ID extension(16) @@ -546,6 +569,7 @@ func (self *SimpleH264Writer) prepare() (err error) { W: self.W, PID: 0x100, } + self.tsw.EnableVecWriter() self.pts = PTS_HZ self.pcr = PCR_HZ