From 396376da2d4229f9663a620b7601cf490ac04172 Mon Sep 17 00:00:00 2001 From: nareix Date: Tue, 12 Jul 2016 08:38:52 +0800 Subject: [PATCH] pubsub: Queue support Demuxer and Close --- av/pubsub/pubsub.go | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/av/pubsub/pubsub.go b/av/pubsub/pubsub.go index 5cf0074..2b1ddeb 100644 --- a/av/pubsub/pubsub.go +++ b/av/pubsub/pubsub.go @@ -1,7 +1,8 @@ package pubsub import ( - "github.com/nareix/av" + "github.com/nareix/joy4/av" + "io" "time" "sync" ) @@ -20,10 +21,13 @@ type Queue struct { lock *sync.RWMutex cond *sync.Cond maxdur time.Duration + streams []av.CodecData + closed bool } -func NewQueue() *Queue { +func NewQueue(streams []av.CodecData) *Queue { q := &Queue{} + q.streams = streams q.maxdur = time.Second*60 q.lock = &sync.RWMutex{} q.cond = sync.NewCond(q.lock.RLocker()) @@ -52,6 +56,15 @@ func (self *Queue) Duration() (dur time.Duration) { return } +func (self *Queue) Close() (err error) { + self.lock.Lock() + defer self.lock.Unlock() + + self.closed = true + self.cond.Broadcast() + return +} + func (self *Queue) WritePacket(pkt av.Packet) (err error) { self.lock.Lock() defer self.lock.Unlock() @@ -62,7 +75,7 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) { } self.pkts = append(self.pkts, pkt) self.tail++ - self.cond.Signal() + self.cond.Broadcast() return } @@ -102,6 +115,14 @@ func (self *Queue) Delayed(dur time.Duration) *QueueCursor { return &QueueCursor{que: self, pos: self.head+i} } +func (self *QueueCursor) Streams() (streams []av.CodecData, err error) { + self.que.lock.RLock() + defer self.que.lock.RUnlock() + + streams = self.que.streams + return +} + func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) { self.que.cond.L.Lock() for { @@ -111,13 +132,17 @@ func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) { self.pos = self.que.tail } if self.pos - self.que.head >= 0 && self.pos - self.que.tail < 0 { + pkt = self.que.pkts[self.pos - self.que.head] + self.pos++ + break + } + if self.que.closed { + err = io.EOF break } self.que.cond.Wait() } - pkt = self.que.pkts[self.pos - self.que.head] self.que.cond.L.Unlock() - self.pos++ return }