pubsub: Queue support Demuxer and Close
This commit is contained in:
parent
18205e5206
commit
396376da2d
@ -1,7 +1,8 @@
|
|||||||
package pubsub
|
package pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/nareix/av"
|
"github.com/nareix/joy4/av"
|
||||||
|
"io"
|
||||||
"time"
|
"time"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
@ -20,10 +21,13 @@ type Queue struct {
|
|||||||
lock *sync.RWMutex
|
lock *sync.RWMutex
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
maxdur time.Duration
|
maxdur time.Duration
|
||||||
|
streams []av.CodecData
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQueue() *Queue {
|
func NewQueue(streams []av.CodecData) *Queue {
|
||||||
q := &Queue{}
|
q := &Queue{}
|
||||||
|
q.streams = streams
|
||||||
q.maxdur = time.Second*60
|
q.maxdur = time.Second*60
|
||||||
q.lock = &sync.RWMutex{}
|
q.lock = &sync.RWMutex{}
|
||||||
q.cond = sync.NewCond(q.lock.RLocker())
|
q.cond = sync.NewCond(q.lock.RLocker())
|
||||||
@ -52,6 +56,15 @@ func (self *Queue) Duration() (dur time.Duration) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *Queue) Close() (err error) {
|
||||||
|
self.lock.Lock()
|
||||||
|
defer self.lock.Unlock()
|
||||||
|
|
||||||
|
self.closed = true
|
||||||
|
self.cond.Broadcast()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (self *Queue) WritePacket(pkt av.Packet) (err error) {
|
func (self *Queue) WritePacket(pkt av.Packet) (err error) {
|
||||||
self.lock.Lock()
|
self.lock.Lock()
|
||||||
defer self.lock.Unlock()
|
defer self.lock.Unlock()
|
||||||
@ -62,7 +75,7 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) {
|
|||||||
}
|
}
|
||||||
self.pkts = append(self.pkts, pkt)
|
self.pkts = append(self.pkts, pkt)
|
||||||
self.tail++
|
self.tail++
|
||||||
self.cond.Signal()
|
self.cond.Broadcast()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,6 +115,14 @@ func (self *Queue) Delayed(dur time.Duration) *QueueCursor {
|
|||||||
return &QueueCursor{que: self, pos: self.head+i}
|
return &QueueCursor{que: self, pos: self.head+i}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *QueueCursor) Streams() (streams []av.CodecData, err error) {
|
||||||
|
self.que.lock.RLock()
|
||||||
|
defer self.que.lock.RUnlock()
|
||||||
|
|
||||||
|
streams = self.que.streams
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) {
|
func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) {
|
||||||
self.que.cond.L.Lock()
|
self.que.cond.L.Lock()
|
||||||
for {
|
for {
|
||||||
@ -111,13 +132,17 @@ func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) {
|
|||||||
self.pos = self.que.tail
|
self.pos = self.que.tail
|
||||||
}
|
}
|
||||||
if self.pos - self.que.head >= 0 && self.pos - self.que.tail < 0 {
|
if self.pos - self.que.head >= 0 && self.pos - self.que.tail < 0 {
|
||||||
|
pkt = self.que.pkts[self.pos - self.que.head]
|
||||||
|
self.pos++
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if self.que.closed {
|
||||||
|
err = io.EOF
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
self.que.cond.Wait()
|
self.que.cond.Wait()
|
||||||
}
|
}
|
||||||
pkt = self.que.pkts[self.pos - self.que.head]
|
|
||||||
self.que.cond.L.Unlock()
|
self.que.cond.L.Unlock()
|
||||||
self.pos++
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user