diff --git a/av/pubsub/queue.go b/av/pubsub/queue.go index 473e823..2d5a832 100644 --- a/av/pubsub/queue.go +++ b/av/pubsub/queue.go @@ -32,19 +32,13 @@ type Queue struct { closed bool } -func NewQueue(streams []av.CodecData) *Queue { +func NewQueue() *Queue { q := &Queue{} q.buf = pktque.NewBuf() - q.streams = streams q.maxdur = time.Second*10 q.lock = &sync.RWMutex{} q.cond = sync.NewCond(q.lock.RLocker()) q.videoidx = -1 - for i, stream := range streams { - if stream.Type().IsVideo() { - q.videoidx = i - } - } return q } @@ -55,6 +49,20 @@ func (self *Queue) SetMaxSize(size int) { return } +func (self *Queue) WriteHeader(streams []av.CodecData) { + self.lock.Lock() + + self.streams = streams + for i, stream := range streams { + if stream.Type().IsVideo() { + self.videoidx = i + } + } + self.cond.Broadcast() + + self.lock.Unlock() +} + // After Close() called, all QueueCursor's ReadPacket will return io.EOF. func (self *Queue) Close() (err error) { self.lock.Lock() @@ -146,9 +154,16 @@ func (self *Queue) DelayedGopCount(n int) *QueueCursor { } func (self *QueueCursor) Streams() (streams []av.CodecData, err error) { - self.que.lock.RLock() - streams = self.que.streams - self.que.lock.RUnlock() + self.que.cond.L.Lock() + for self.que.streams == nil && !self.que.closed { + self.que.cond.Wait() + } + if self.que.streams != nil { + streams = self.que.streams + } else { + err = io.EOF + } + self.que.cond.L.Unlock() return }