diff --git a/av/pktque/buf.go b/av/pktque/buf.go index 70b890f..6624f55 100644 --- a/av/pktque/buf.go +++ b/av/pktque/buf.go @@ -5,33 +5,31 @@ import ( ) type Buf struct { - Head, Tail BufPos - pkts []av.Packet - size, maxsize int - count int + Head, Tail BufPos + pkts []av.Packet + Size int + Count int } func NewBuf() *Buf { return &Buf{ - pkts: make([]av.Packet, 64), - maxsize: 1024 * 512, + pkts: make([]av.Packet, 64), } } -func (self *Buf) SetMaxSize(size int) { - self.maxsize = size - self.shrink() -} - -func (self *Buf) shrink() { - for self.size > self.maxsize && self.count > 1 { - i := int(self.Head) & (len(self.pkts) - 1) - pkt := self.pkts[i] - self.pkts[i] = av.Packet{} - self.size -= len(pkt.Data) - self.Head++ - self.count-- +func (self *Buf) Pop() av.Packet { + if self.Count == 0 { + panic("pktque.Buf: Pop() when count == 0") } + + i := int(self.Head) & (len(self.pkts) - 1) + pkt := self.pkts[i] + self.pkts[i] = av.Packet{} + self.Size -= len(pkt.Data) + self.Head++ + self.Count-- + + return pkt } func (self *Buf) grow() { @@ -43,14 +41,13 @@ func (self *Buf) grow() { } func (self *Buf) Push(pkt av.Packet) { - if self.count == len(self.pkts) { + if self.Count == len(self.pkts) { self.grow() } self.pkts[int(self.Tail)&(len(self.pkts)-1)] = pkt self.Tail++ - self.count++ - self.size += len(pkt.Data) - self.shrink() + self.Count++ + self.Size += len(pkt.Data) } func (self *Buf) Get(pos BufPos) av.Packet { diff --git a/av/pubsub/queue.go b/av/pubsub/queue.go index 2d5a832..9c5c32c 100644 --- a/av/pubsub/queue.go +++ b/av/pubsub/queue.go @@ -1,4 +1,3 @@ - // Packege pubsub implements publisher-subscribers model used in multi-channel streaming. package pubsub @@ -6,45 +5,45 @@ import ( "github.com/nareix/joy4/av" "github.com/nareix/joy4/av/pktque" "io" - "time" "sync" + "time" ) // time // -----------------> -// +// // V-A-V-V-A-V-V-A-V-V // | | // 0 5 10 // head tail // oldest latest -// +// // One publisher and multiple subscribers thread-safe packet buffer queue. type Queue struct { - buf *pktque.Buf - head, tail int - lock *sync.RWMutex - cond *sync.Cond - maxdur time.Duration - streams []av.CodecData - videoidx int - closed bool + buf *pktque.Buf + head, tail int + lock *sync.RWMutex + cond *sync.Cond + curgopcount, maxgopcount int + streams []av.CodecData + videoidx int + closed bool } func NewQueue() *Queue { q := &Queue{} q.buf = pktque.NewBuf() - q.maxdur = time.Second*10 + q.maxgopcount = 2 q.lock = &sync.RWMutex{} q.cond = sync.NewCond(q.lock.RLocker()) q.videoidx = -1 return q } -func (self *Queue) SetMaxSize(size int) { +func (self *Queue) SetMaxGopCount(n int) { self.lock.Lock() - self.buf.SetMaxSize(size) + self.maxgopcount = n self.lock.Unlock() return } @@ -79,6 +78,21 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) { self.lock.Lock() self.buf.Push(pkt) + if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame { + self.curgopcount++ + } + + for self.curgopcount >= self.maxgopcount && self.buf.Count > 1 { + pkt := self.buf.Pop() + if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame { + self.curgopcount-- + } + if self.curgopcount < self.maxgopcount { + break + } + } + //println("shrink", self.curgopcount, self.maxgopcount, self.buf.Head, self.buf.Tail, "count", self.buf.Count, "size", self.buf.Size) + self.cond.Broadcast() self.lock.Unlock() @@ -86,10 +100,10 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) { } type QueueCursor struct { - que *Queue - pos pktque.BufPos + que *Queue + pos pktque.BufPos gotpos bool - init func(buf *pktque.Buf, videoidx int) pktque.BufPos + init func(buf *pktque.Buf, videoidx int) pktque.BufPos } func (self *Queue) newCursor() *QueueCursor { @@ -120,11 +134,11 @@ func (self *Queue) Oldest() *QueueCursor { func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { cursor := self.newCursor() cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { - i := buf.Tail-1 + i := buf.Tail - 1 if buf.IsValidPos(i) { end := buf.Get(i) for buf.IsValidPos(i) { - if end.Time - buf.Get(i).Time > dur { + if end.Time-buf.Get(i).Time > dur { break } i-- @@ -139,7 +153,7 @@ func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { func (self *Queue) DelayedGopCount(n int) *QueueCursor { cursor := self.newCursor() cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { - i := buf.Tail-1 + i := buf.Tail - 1 if videoidx != -1 { for gop := 0; buf.IsValidPos(i) && gop < n; i-- { pkt := buf.Get(i) @@ -195,4 +209,3 @@ func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) { self.que.cond.L.Unlock() return } -