pktque: change SetMaxSize to SetMaxGopCount

This commit is contained in:
nareix 2016-09-19 07:26:58 +08:00
parent 8802fb5a95
commit a76cfa1413
2 changed files with 55 additions and 45 deletions

View File

@ -5,33 +5,31 @@ import (
) )
type Buf struct { type Buf struct {
Head, Tail BufPos Head, Tail BufPos
pkts []av.Packet pkts []av.Packet
size, maxsize int Size int
count int Count int
} }
func NewBuf() *Buf { func NewBuf() *Buf {
return &Buf{ return &Buf{
pkts: make([]av.Packet, 64), pkts: make([]av.Packet, 64),
maxsize: 1024 * 512,
} }
} }
func (self *Buf) SetMaxSize(size int) { func (self *Buf) Pop() av.Packet {
self.maxsize = size if self.Count == 0 {
self.shrink() panic("pktque.Buf: Pop() when count == 0")
}
func (self *Buf) shrink() {
for self.size > self.maxsize && self.count > 1 {
i := int(self.Head) & (len(self.pkts) - 1)
pkt := self.pkts[i]
self.pkts[i] = av.Packet{}
self.size -= len(pkt.Data)
self.Head++
self.count--
} }
i := int(self.Head) & (len(self.pkts) - 1)
pkt := self.pkts[i]
self.pkts[i] = av.Packet{}
self.Size -= len(pkt.Data)
self.Head++
self.Count--
return pkt
} }
func (self *Buf) grow() { func (self *Buf) grow() {
@ -43,14 +41,13 @@ func (self *Buf) grow() {
} }
func (self *Buf) Push(pkt av.Packet) { func (self *Buf) Push(pkt av.Packet) {
if self.count == len(self.pkts) { if self.Count == len(self.pkts) {
self.grow() self.grow()
} }
self.pkts[int(self.Tail)&(len(self.pkts)-1)] = pkt self.pkts[int(self.Tail)&(len(self.pkts)-1)] = pkt
self.Tail++ self.Tail++
self.count++ self.Count++
self.size += len(pkt.Data) self.Size += len(pkt.Data)
self.shrink()
} }
func (self *Buf) Get(pos BufPos) av.Packet { func (self *Buf) Get(pos BufPos) av.Packet {

View File

@ -1,4 +1,3 @@
// Packege pubsub implements publisher-subscribers model used in multi-channel streaming. // Packege pubsub implements publisher-subscribers model used in multi-channel streaming.
package pubsub package pubsub
@ -6,8 +5,8 @@ import (
"github.com/nareix/joy4/av" "github.com/nareix/joy4/av"
"github.com/nareix/joy4/av/pktque" "github.com/nareix/joy4/av/pktque"
"io" "io"
"time"
"sync" "sync"
"time"
) )
// time // time
@ -22,29 +21,29 @@ import (
// One publisher and multiple subscribers thread-safe packet buffer queue. // One publisher and multiple subscribers thread-safe packet buffer queue.
type Queue struct { type Queue struct {
buf *pktque.Buf buf *pktque.Buf
head, tail int head, tail int
lock *sync.RWMutex lock *sync.RWMutex
cond *sync.Cond cond *sync.Cond
maxdur time.Duration curgopcount, maxgopcount int
streams []av.CodecData streams []av.CodecData
videoidx int videoidx int
closed bool closed bool
} }
func NewQueue() *Queue { func NewQueue() *Queue {
q := &Queue{} q := &Queue{}
q.buf = pktque.NewBuf() q.buf = pktque.NewBuf()
q.maxdur = time.Second*10 q.maxgopcount = 2
q.lock = &sync.RWMutex{} q.lock = &sync.RWMutex{}
q.cond = sync.NewCond(q.lock.RLocker()) q.cond = sync.NewCond(q.lock.RLocker())
q.videoidx = -1 q.videoidx = -1
return q return q
} }
func (self *Queue) SetMaxSize(size int) { func (self *Queue) SetMaxGopCount(n int) {
self.lock.Lock() self.lock.Lock()
self.buf.SetMaxSize(size) self.maxgopcount = n
self.lock.Unlock() self.lock.Unlock()
return return
} }
@ -79,6 +78,21 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) {
self.lock.Lock() self.lock.Lock()
self.buf.Push(pkt) self.buf.Push(pkt)
if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame {
self.curgopcount++
}
for self.curgopcount >= self.maxgopcount && self.buf.Count > 1 {
pkt := self.buf.Pop()
if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame {
self.curgopcount--
}
if self.curgopcount < self.maxgopcount {
break
}
}
//println("shrink", self.curgopcount, self.maxgopcount, self.buf.Head, self.buf.Tail, "count", self.buf.Count, "size", self.buf.Size)
self.cond.Broadcast() self.cond.Broadcast()
self.lock.Unlock() self.lock.Unlock()
@ -86,10 +100,10 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) {
} }
type QueueCursor struct { type QueueCursor struct {
que *Queue que *Queue
pos pktque.BufPos pos pktque.BufPos
gotpos bool gotpos bool
init func(buf *pktque.Buf, videoidx int) pktque.BufPos init func(buf *pktque.Buf, videoidx int) pktque.BufPos
} }
func (self *Queue) newCursor() *QueueCursor { func (self *Queue) newCursor() *QueueCursor {
@ -120,11 +134,11 @@ func (self *Queue) Oldest() *QueueCursor {
func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor {
cursor := self.newCursor() cursor := self.newCursor()
cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
i := buf.Tail-1 i := buf.Tail - 1
if buf.IsValidPos(i) { if buf.IsValidPos(i) {
end := buf.Get(i) end := buf.Get(i)
for buf.IsValidPos(i) { for buf.IsValidPos(i) {
if end.Time - buf.Get(i).Time > dur { if end.Time-buf.Get(i).Time > dur {
break break
} }
i-- i--
@ -139,7 +153,7 @@ func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor {
func (self *Queue) DelayedGopCount(n int) *QueueCursor { func (self *Queue) DelayedGopCount(n int) *QueueCursor {
cursor := self.newCursor() cursor := self.newCursor()
cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
i := buf.Tail-1 i := buf.Tail - 1
if videoidx != -1 { if videoidx != -1 {
for gop := 0; buf.IsValidPos(i) && gop < n; i-- { for gop := 0; buf.IsValidPos(i) && gop < n; i-- {
pkt := buf.Get(i) pkt := buf.Get(i)
@ -195,4 +209,3 @@ func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) {
self.que.cond.L.Unlock() self.que.cond.L.Unlock()
return return
} }