add vecWriter

This commit is contained in:
nareix 2015-12-09 18:02:33 +08:00
parent bc93d48788
commit 8cf764967d
3 changed files with 118 additions and 6 deletions

View File

@ -283,6 +283,7 @@ func main() {
W: file,
PID: 0x100,
}
w.EnableVecWriter()
}
for {

View File

@ -3,11 +3,18 @@ package ts
import (
"io"
"os"
"net"
"fmt"
"unsafe"
"syscall"
)
type iovec struct {
data [][]byte
Len int
pos int
idx int
}
func (self *iovec) Append(b []byte) {
@ -17,7 +24,7 @@ func (self *iovec) Append(b []byte) {
func (self *iovec) WriteTo(w io.Writer, n int) (written int, err error) {
for n > 0 && self.Len > 0 {
data := self.data[0]
data := self.data[self.idx]
var b []byte
if n > len(data) {
@ -28,9 +35,9 @@ func (self *iovec) WriteTo(w io.Writer, n int) (written int, err error) {
data = data[len(b):]
if len(data) == 0 {
self.data = self.data[1:]
self.idx++
} else {
self.data[0] = data
self.data[self.idx] = data
}
self.Len -= len(b)
n -= len(b)
@ -43,3 +50,83 @@ func (self *iovec) WriteTo(w io.Writer, n int) (written int, err error) {
return
}
type sysiovec struct {
Base uintptr
Len uint64
}
type vecWriter struct {
fd uintptr
smallBytesBuf []byte
iov []sysiovec
}
func (self *vecWriter) Write(p []byte) (written int, err error) {
iov := sysiovec{
Len: uint64(len(p)),
}
if len(p) < 16 {
iov.Base = uintptr(len(self.smallBytesBuf))
self.smallBytesBuf = append(self.smallBytesBuf, p...)
} else {
iov.Base = uintptr(unsafe.Pointer(&p[0]))
}
self.iov = append(self.iov, iov)
return
}
func (self *vecWriter) Flush() (err error) {
for i := range self.iov {
iov := &self.iov[i]
if iov.Base < uintptr(len(self.smallBytesBuf)) {
iov.Base = uintptr(unsafe.Pointer(&self.smallBytesBuf[iov.Base]))
}
}
N := 1024
for i := 0; i < len(self.iov); i += N {
n := len(self.iov) - i
if n > N {
n = N
}
_, _, errno := syscall.Syscall(syscall.SYS_WRITEV, self.fd, uintptr(unsafe.Pointer(&self.iov[i])), uintptr(n))
if errno != 0 {
err = fmt.Errorf("writev failed with error: %d", errno)
return
}
}
if DebugWriter {
fmt.Printf("vecw: smallBytesBuf=%d iovNr=%d\n", len(self.smallBytesBuf), len(self.iov))
}
self.iov = self.iov[:0]
self.smallBytesBuf = self.smallBytesBuf[:0]
return
}
func newVecWriter(w io.Writer) (vecw *vecWriter) {
var err error
var f *os.File
switch obj := w.(type) {
case *net.TCPConn:
f, err = obj.File()
if err != nil {
return
}
case *os.File:
f = obj
default:
return
}
vecw = &vecWriter{
fd: f.Fd(),
}
return
}

View File

@ -151,9 +151,26 @@ type TSWriter struct {
PID uint
TSHeader
DisableHeaderPadding bool
vecw *vecWriter
}
func (self *TSWriter) EnableVecWriter() {
if self.vecw == nil {
self.vecw = newVecWriter(self.W)
if DebugWriter && self.vecw != nil {
fmt.Println("tsw: enabled vec writer")
}
}
}
func (self *TSWriter) WriteIovec(data *iovec) (err error) {
w := self.W
if self.vecw != nil {
w = self.vecw
}
for i := 0; data.Len > 0; i++ {
header := TSHeader{
PID: self.PID,
@ -172,7 +189,7 @@ func (self *TSWriter) WriteIovec(data *iovec) (err error) {
requestLength = 188
}
var headerLength int
if headerLength, err = WriteTSHeader(self.W, header, requestLength); err != nil {
if headerLength, err = WriteTSHeader(w, header, requestLength); err != nil {
return
}
payloadLength := 188 - headerLength
@ -184,13 +201,19 @@ func (self *TSWriter) WriteIovec(data *iovec) (err error) {
fmt.Printf("tsw: payloadLength=%d dataLength=%d\n", payloadLength, data.Len)
}
if _, err = data.WriteTo(self.W, payloadLength); err != nil {
if _, err = data.WriteTo(w, payloadLength); err != nil {
return
}
self.ContinuityCounter++
}
if self.vecw != nil {
if err = self.vecw.Flush(); err != nil {
return
}
}
return
}
@ -232,7 +255,7 @@ func WritePSI(w io.Writer, self PSI, data []byte) (err error) {
}
if DebugWriter {
fmt.Printf("wpsi: length=%d\n", length)
fmt.Printf("psiw: length=%d\n", length)
}
// Table ID extension(16)
@ -546,6 +569,7 @@ func (self *SimpleH264Writer) prepare() (err error) {
W: self.W,
PID: 0x100,
}
self.tsw.EnableVecWriter()
self.pts = PTS_HZ
self.pcr = PCR_HZ