pubsub.Queue: support WriteHeader()
This commit is contained in:
parent
46d5df0985
commit
3d9cae6b8b
@ -32,19 +32,13 @@ type Queue struct {
|
|||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQueue(streams []av.CodecData) *Queue {
|
func NewQueue() *Queue {
|
||||||
q := &Queue{}
|
q := &Queue{}
|
||||||
q.buf = pktque.NewBuf()
|
q.buf = pktque.NewBuf()
|
||||||
q.streams = streams
|
|
||||||
q.maxdur = time.Second*10
|
q.maxdur = time.Second*10
|
||||||
q.lock = &sync.RWMutex{}
|
q.lock = &sync.RWMutex{}
|
||||||
q.cond = sync.NewCond(q.lock.RLocker())
|
q.cond = sync.NewCond(q.lock.RLocker())
|
||||||
q.videoidx = -1
|
q.videoidx = -1
|
||||||
for i, stream := range streams {
|
|
||||||
if stream.Type().IsVideo() {
|
|
||||||
q.videoidx = i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return q
|
return q
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,6 +49,20 @@ func (self *Queue) SetMaxSize(size int) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *Queue) WriteHeader(streams []av.CodecData) {
|
||||||
|
self.lock.Lock()
|
||||||
|
|
||||||
|
self.streams = streams
|
||||||
|
for i, stream := range streams {
|
||||||
|
if stream.Type().IsVideo() {
|
||||||
|
self.videoidx = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.cond.Broadcast()
|
||||||
|
|
||||||
|
self.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// After Close() called, all QueueCursor's ReadPacket will return io.EOF.
|
// After Close() called, all QueueCursor's ReadPacket will return io.EOF.
|
||||||
func (self *Queue) Close() (err error) {
|
func (self *Queue) Close() (err error) {
|
||||||
self.lock.Lock()
|
self.lock.Lock()
|
||||||
@ -146,9 +154,16 @@ func (self *Queue) DelayedGopCount(n int) *QueueCursor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (self *QueueCursor) Streams() (streams []av.CodecData, err error) {
|
func (self *QueueCursor) Streams() (streams []av.CodecData, err error) {
|
||||||
self.que.lock.RLock()
|
self.que.cond.L.Lock()
|
||||||
|
for self.que.streams == nil && !self.que.closed {
|
||||||
|
self.que.cond.Wait()
|
||||||
|
}
|
||||||
|
if self.que.streams != nil {
|
||||||
streams = self.que.streams
|
streams = self.que.streams
|
||||||
self.que.lock.RUnlock()
|
} else {
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
self.que.cond.L.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user