diff --git a/av/pktque/buf.go b/av/pktque/buf.go new file mode 100644 index 0000000..8be5f9a --- /dev/null +++ b/av/pktque/buf.go @@ -0,0 +1,73 @@ + +package pktque + +import ( + "github.com/nareix/joy4/av" +) + +type Buf struct { + Head, Tail BufPos + pkts []av.Packet + size, maxsize int +} + +func NewBuf() *Buf { + return &Buf{ + pkts: make([]av.Packet, 64), + maxsize: 1024*512, + } +} + +func (self *Buf) SetMaxSize(size int) { + self.maxsize = size + self.shrink() +} + +func (self *Buf) shrink() { + for self.size > self.maxsize && self.Head-self.Tail > 1 { + pkt := self.pkts[int(self.Head)&(len(self.pkts)-1)] + self.size -= len(pkt.Data) + self.Head++ + } +} + +func (self *Buf) grow() { + newpkts := make([]av.Packet, len(self.pkts)*2) + for i := self.Head; i.LT(self.Tail); i++ { + newpkts[int(i)&(len(newpkts)-1)] = self.pkts[int(i)&(len(self.pkts)-1)] + } + self.pkts = newpkts +} + +func (self *Buf) Push(pkt av.Packet) { + if int(self.Tail-self.Head) == len(self.pkts) { + self.grow() + } + self.pkts[int(self.Tail)&(len(self.pkts)-1)] = pkt + self.Tail++ + self.size += len(pkt.Data) + self.shrink() +} + +func (self *Buf) Get(pos BufPos) av.Packet { + return self.pkts[int(pos)&(len(self.pkts)-1)] +} + +func (self *Buf) IsValidPos(pos BufPos) bool { + return pos.GE(self.Head) && pos.LT(self.Tail) +} + +type BufPos int + +func (self BufPos) LT(pos BufPos) bool { + return self - pos < 0 +} + +func (self BufPos) GE(pos BufPos) bool { + return self - pos >= 0 +} + +func (self BufPos) GT(pos BufPos) bool { + return self - pos > 0 +} + diff --git a/av/pubsub/queue.go b/av/pubsub/queue.go index 2d312c4..473e823 100644 --- a/av/pubsub/queue.go +++ b/av/pubsub/queue.go @@ -4,6 +4,7 @@ package pubsub import ( "github.com/nareix/joy4/av" + "github.com/nareix/joy4/av/pktque" "io" "time" "sync" @@ -21,7 +22,7 @@ import ( // One publisher and multiple subscribers thread-safe packet buffer queue. type Queue struct { - pkts []av.Packet + buf *pktque.Buf head, tail int lock *sync.RWMutex cond *sync.Cond @@ -33,6 +34,7 @@ type Queue struct { func NewQueue(streams []av.CodecData) *Queue { q := &Queue{} + q.buf = pktque.NewBuf() q.streams = streams q.maxdur = time.Second*10 q.lock = &sync.RWMutex{} @@ -46,32 +48,13 @@ func NewQueue(streams []av.CodecData) *Queue { return q } -// Set max buffered total packets duration. -func (self *Queue) SetMaxDuration(dur time.Duration) { +func (self *Queue) SetMaxSize(size int) { self.lock.Lock() - - self.maxdur = dur - for self.maxdur > 0 && len(self.pkts) >= 2 && self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time > self.maxdur { - self.pkts = self.pkts[1:] - self.head++ - } - + self.buf.SetMaxSize(size) self.lock.Unlock() return } -// Currently buffered packets total duration. -func (self *Queue) Duration() (dur time.Duration) { - self.lock.RLock() - - if len(self.pkts) >= 2 { - dur = self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time - } - - self.lock.RUnlock() - return -} - // After Close() called, all QueueCursor's ReadPacket will return io.EOF. func (self *Queue) Close() (err error) { self.lock.Lock() @@ -87,12 +70,7 @@ func (self *Queue) Close() (err error) { func (self *Queue) WritePacket(pkt av.Packet) (err error) { self.lock.Lock() - if self.maxdur > 0 && len(self.pkts) >= 2 && self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time > self.maxdur { - self.pkts = self.pkts[1:] - self.head++ - } - self.pkts = append(self.pkts, pkt) - self.tail++ + self.buf.Push(pkt) self.cond.Broadcast() self.lock.Unlock() @@ -101,22 +79,22 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) { type QueueCursor struct { que *Queue - pos int - init func(pkts []av.Packet, videoidx int) int + pos pktque.BufPos + gotpos bool + init func(buf *pktque.Buf, videoidx int) pktque.BufPos } func (self *Queue) newCursor() *QueueCursor { return &QueueCursor{ que: self, - pos: -1, } } // Create cursor position at latest packet. func (self *Queue) Latest() *QueueCursor { cursor := self.newCursor() - cursor.init = func(pkts []av.Packet, videoidx int) int { - return len(pkts) + cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { + return buf.Tail } return cursor } @@ -124,8 +102,8 @@ func (self *Queue) Latest() *QueueCursor { // Create cursor position at oldest buffered packet. func (self *Queue) Oldest() *QueueCursor { cursor := self.newCursor() - cursor.init = func(pkts []av.Packet, videoidx int) int { - return 0 + cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { + return buf.Head } return cursor } @@ -133,14 +111,15 @@ func (self *Queue) Oldest() *QueueCursor { // Create cursor position at specific time in buffered packets. func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { cursor := self.newCursor() - cursor.init = func(pkts []av.Packet, videoidx int) int { - i := len(self.pkts)-1 - if i > 0 { - end := self.pkts[i] - for i--; i >= 0; i-- { - if end.Time - self.pkts[i].Time > dur { + cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { + i := buf.Tail-1 + if buf.IsValidPos(i) { + end := buf.Get(i) + for buf.IsValidPos(i) { + if end.Time - buf.Get(i).Time > dur { break } + i-- } } return i @@ -151,12 +130,12 @@ func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { // Create cursor position at specific delayed GOP count in buffered packets. func (self *Queue) DelayedGopCount(n int) *QueueCursor { cursor := self.newCursor() - cursor.init = func(pkts []av.Packet, videoidx int) int { - i := 0 + cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { + i := buf.Tail-1 if videoidx != -1 { - i = len(self.pkts)-1 - for gop := 0; i >= 0 && gop < n; i-- { - if self.pkts[i].Idx == int8(self.videoidx) && self.pkts[i].IsKeyFrame { + for gop := 0; buf.IsValidPos(i) && gop < n; i-- { + pkt := buf.Get(i) + if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame { gop++ } } @@ -176,17 +155,19 @@ func (self *QueueCursor) Streams() (streams []av.CodecData, err error) { // ReadPacket will not consume packets in Queue, it's just a cursor. func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) { self.que.cond.L.Lock() - if self.pos == -1 { - self.pos = self.init(self.que.pkts, self.que.videoidx)+self.que.head + buf := self.que.buf + if !self.gotpos { + self.pos = self.init(buf, self.que.videoidx) + self.gotpos = true } for { - if self.pos - self.que.head < 0 { - self.pos = self.que.head - } else if self.pos - self.que.tail > 0 { - self.pos = self.que.tail + if self.pos.LT(buf.Head) { + self.pos = buf.Head + } else if self.pos.GT(buf.Tail) { + self.pos = buf.Tail } - if self.pos - self.que.head >= 0 && self.pos - self.que.tail < 0 { - pkt = self.que.pkts[self.pos - self.que.head] + if buf.IsValidPos(self.pos) { + pkt = buf.Get(self.pos) self.pos++ break }