diff --git a/av/pubsub/queue.go b/av/pubsub/queue.go index 6727ce7..2d312c4 100644 --- a/av/pubsub/queue.go +++ b/av/pubsub/queue.go @@ -49,41 +49,43 @@ func NewQueue(streams []av.CodecData) *Queue { // Set max buffered total packets duration. func (self *Queue) SetMaxDuration(dur time.Duration) { self.lock.Lock() - defer self.lock.Unlock() self.maxdur = dur for self.maxdur > 0 && len(self.pkts) >= 2 && self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time > self.maxdur { self.pkts = self.pkts[1:] self.head++ } + + self.lock.Unlock() return } // Currently buffered packets total duration. func (self *Queue) Duration() (dur time.Duration) { self.lock.RLock() - defer self.lock.RUnlock() if len(self.pkts) >= 2 { dur = self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time } + + self.lock.RUnlock() return } // After Close() called, all QueueCursor's ReadPacket will return io.EOF. func (self *Queue) Close() (err error) { self.lock.Lock() - defer self.lock.Unlock() self.closed = true self.cond.Broadcast() + + self.lock.Unlock() return } // Put packet into buffer, old packets will be discared. func (self *Queue) WritePacket(pkt av.Packet) (err error) { self.lock.Lock() - defer self.lock.Unlock() if self.maxdur > 0 && len(self.pkts) >= 2 && self.pkts[len(self.pkts)-1].Time - self.pkts[0].Time > self.maxdur { self.pkts = self.pkts[1:] @@ -92,6 +94,8 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) { self.pkts = append(self.pkts, pkt) self.tail++ self.cond.Broadcast() + + self.lock.Unlock() return } @@ -164,9 +168,8 @@ func (self *Queue) DelayedGopCount(n int) *QueueCursor { func (self *QueueCursor) Streams() (streams []av.CodecData, err error) { self.que.lock.RLock() - defer self.que.lock.RUnlock() - streams = self.que.streams + self.que.lock.RUnlock() return }