pubsub: add pktque.Buf

This commit is contained in:
nareix 2016-08-26 11:17:54 +08:00
parent 1832092ce2
commit 3937f7e984
2 changed files with 108 additions and 54 deletions

73
av/pktque/buf.go Normal file
View File

@ -0,0 +1,73 @@
package pktque
import (
"github.com/nareix/joy4/av"
)
type Buf struct {
Head, Tail BufPos
pkts []av.Packet
size, maxsize int
}
func NewBuf() *Buf {
return &Buf{
pkts: make([]av.Packet, 64),
maxsize: 1024*512,
}
}
func (self *Buf) SetMaxSize(size int) {
self.maxsize = size
self.shrink()
}
func (self *Buf) shrink() {
for self.size > self.maxsize && self.Head-self.Tail > 1 {
pkt := self.pkts[int(self.Head)&(len(self.pkts)-1)]
self.size -= len(pkt.Data)
self.Head++
}
}
func (self *Buf) grow() {
newpkts := make([]av.Packet, len(self.pkts)*2)
for i := self.Head; i.LT(self.Tail); i++ {
newpkts[int(i)&(len(newpkts)-1)] = self.pkts[int(i)&(len(self.pkts)-1)]
}
self.pkts = newpkts
}
func (self *Buf) Push(pkt av.Packet) {
if int(self.Tail-self.Head) == len(self.pkts) {
self.grow()
}
self.pkts[int(self.Tail)&(len(self.pkts)-1)] = pkt
self.Tail++
self.size += len(pkt.Data)
self.shrink()
}
func (self *Buf) Get(pos BufPos) av.Packet {
return self.pkts[int(pos)&(len(self.pkts)-1)]
}
func (self *Buf) IsValidPos(pos BufPos) bool {
return pos.GE(self.Head) && pos.LT(self.Tail)
}
type BufPos int
func (self BufPos) LT(pos BufPos) bool {
return self - pos < 0
}
func (self BufPos) GE(pos BufPos) bool {
return self - pos >= 0
}
func (self BufPos) GT(pos BufPos) bool {
return self - pos > 0
}

View File

@ -4,6 +4,7 @@ package pubsub
import ( import (
"github.com/nareix/joy4/av" "github.com/nareix/joy4/av"
"github.com/nareix/joy4/av/pktque"
"io" "io"
"time" "time"
"sync" "sync"
@ -21,7 +22,7 @@ import (
// One publisher and multiple subscribers thread-safe packet buffer queue. // One publisher and multiple subscribers thread-safe packet buffer queue.
type Queue struct { type Queue struct {
pkts []av.Packet buf *pktque.Buf
head, tail int head, tail int
lock *sync.RWMutex lock *sync.RWMutex
cond *sync.Cond cond *sync.Cond
@ -33,6 +34,7 @@ type Queue struct {
func NewQueue(streams []av.CodecData) *Queue { func NewQueue(streams []av.CodecData) *Queue {
q := &Queue{} q := &Queue{}
q.buf = pktque.NewBuf()
q.streams = streams q.streams = streams
q.maxdur = time.Second*10 q.maxdur = time.Second*10
q.lock = &sync.RWMutex{} q.lock = &sync.RWMutex{}
@ -46,32 +48,13 @@ func NewQueue(streams []av.CodecData) *Queue {
return q return q
} }
// Set max buffered total packets duration. func (self *Queue) SetMaxSize(size int) {
func (self *Queue) SetMaxDuration(dur time.Duration) {
self.lock.Lock() self.lock.Lock()
self.buf.SetMaxSize(size)
self.maxdur = dur
for self.maxdur > 0 && len(self.pkts) >= 2 && self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time > self.maxdur {
self.pkts = self.pkts[1:]
self.head++
}
self.lock.Unlock() self.lock.Unlock()
return return
} }
// Currently buffered packets total duration.
func (self *Queue) Duration() (dur time.Duration) {
self.lock.RLock()
if len(self.pkts) >= 2 {
dur = self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time
}
self.lock.RUnlock()
return
}
// After Close() called, all QueueCursor's ReadPacket will return io.EOF. // After Close() called, all QueueCursor's ReadPacket will return io.EOF.
func (self *Queue) Close() (err error) { func (self *Queue) Close() (err error) {
self.lock.Lock() self.lock.Lock()
@ -87,12 +70,7 @@ func (self *Queue) Close() (err error) {
func (self *Queue) WritePacket(pkt av.Packet) (err error) { func (self *Queue) WritePacket(pkt av.Packet) (err error) {
self.lock.Lock() self.lock.Lock()
if self.maxdur > 0 && len(self.pkts) >= 2 && self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time > self.maxdur { self.buf.Push(pkt)
self.pkts = self.pkts[1:]
self.head++
}
self.pkts = append(self.pkts, pkt)
self.tail++
self.cond.Broadcast() self.cond.Broadcast()
self.lock.Unlock() self.lock.Unlock()
@ -101,22 +79,22 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) {
type QueueCursor struct { type QueueCursor struct {
que *Queue que *Queue
pos int pos pktque.BufPos
init func(pkts []av.Packet, videoidx int) int gotpos bool
init func(buf *pktque.Buf, videoidx int) pktque.BufPos
} }
func (self *Queue) newCursor() *QueueCursor { func (self *Queue) newCursor() *QueueCursor {
return &QueueCursor{ return &QueueCursor{
que: self, que: self,
pos: -1,
} }
} }
// Create cursor position at latest packet. // Create cursor position at latest packet.
func (self *Queue) Latest() *QueueCursor { func (self *Queue) Latest() *QueueCursor {
cursor := self.newCursor() cursor := self.newCursor()
cursor.init = func(pkts []av.Packet, videoidx int) int { cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
return len(pkts) return buf.Tail
} }
return cursor return cursor
} }
@ -124,8 +102,8 @@ func (self *Queue) Latest() *QueueCursor {
// Create cursor position at oldest buffered packet. // Create cursor position at oldest buffered packet.
func (self *Queue) Oldest() *QueueCursor { func (self *Queue) Oldest() *QueueCursor {
cursor := self.newCursor() cursor := self.newCursor()
cursor.init = func(pkts []av.Packet, videoidx int) int { cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
return 0 return buf.Head
} }
return cursor return cursor
} }
@ -133,14 +111,15 @@ func (self *Queue) Oldest() *QueueCursor {
// Create cursor position at specific time in buffered packets. // Create cursor position at specific time in buffered packets.
func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor {
cursor := self.newCursor() cursor := self.newCursor()
cursor.init = func(pkts []av.Packet, videoidx int) int { cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
i := len(self.pkts)-1 i := buf.Tail-1
if i > 0 { if buf.IsValidPos(i) {
end := self.pkts[i] end := buf.Get(i)
for i--; i >= 0; i-- { for buf.IsValidPos(i) {
if end.Time - self.pkts[i].Time > dur { if end.Time - buf.Get(i).Time > dur {
break break
} }
i--
} }
} }
return i return i
@ -151,12 +130,12 @@ func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor {
// Create cursor position at specific delayed GOP count in buffered packets. // Create cursor position at specific delayed GOP count in buffered packets.
func (self *Queue) DelayedGopCount(n int) *QueueCursor { func (self *Queue) DelayedGopCount(n int) *QueueCursor {
cursor := self.newCursor() cursor := self.newCursor()
cursor.init = func(pkts []av.Packet, videoidx int) int { cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
i := 0 i := buf.Tail-1
if videoidx != -1 { if videoidx != -1 {
i = len(self.pkts)-1 for gop := 0; buf.IsValidPos(i) && gop < n; i-- {
for gop := 0; i >= 0 && gop < n; i-- { pkt := buf.Get(i)
if self.pkts[i].Idx == int8(self.videoidx) && self.pkts[i].IsKeyFrame { if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame {
gop++ gop++
} }
} }
@ -176,17 +155,19 @@ func (self *QueueCursor) Streams() (streams []av.CodecData, err error) {
// ReadPacket will not consume packets in Queue, it's just a cursor. // ReadPacket will not consume packets in Queue, it's just a cursor.
func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) { func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) {
self.que.cond.L.Lock() self.que.cond.L.Lock()
if self.pos == -1 { buf := self.que.buf
self.pos = self.init(self.que.pkts, self.que.videoidx)+self.que.head if !self.gotpos {
self.pos = self.init(buf, self.que.videoidx)
self.gotpos = true
} }
for { for {
if self.pos - self.que.head < 0 { if self.pos.LT(buf.Head) {
self.pos = self.que.head self.pos = buf.Head
} else if self.pos - self.que.tail > 0 { } else if self.pos.GT(buf.Tail) {
self.pos = self.que.tail self.pos = buf.Tail
} }
if self.pos - self.que.head >= 0 && self.pos - self.que.tail < 0 { if buf.IsValidPos(self.pos) {
pkt = self.que.pkts[self.pos - self.que.head] pkt = buf.Get(self.pos)
self.pos++ self.pos++
break break
} }