From 308971f1d1a5fce0b022b8bfda7cd83b5ec94a1d Mon Sep 17 00:00:00 2001 From: nareix Date: Wed, 22 Jun 2016 19:25:45 +0800 Subject: [PATCH] remove TimeCorrector and fix Queue --- pktque/normalizer.go | 93 +++++++++++++++++++++---------------------- pktque/queue.go | 62 +++++++++++++++++++++++++---- pktque/timecorrect.go | 75 ---------------------------------- 3 files changed, 100 insertions(+), 130 deletions(-) delete mode 100644 pktque/timecorrect.go diff --git a/pktque/normalizer.go b/pktque/normalizer.go index 4b7113b..5a621fd 100644 --- a/pktque/normalizer.go +++ b/pktque/normalizer.go @@ -3,61 +3,64 @@ package pktque import ( "github.com/nareix/av" "time" + "fmt" ) +const debugNormalizer = false + type Normalizer struct { - que []*Queue - timecr *TimeCorrector + ques Queues streams []av.CodecData - timediff time.Duration } func (self *Normalizer) Push(pkt av.Packet) { - self.timecr.Correct(&pkt) - i := int(pkt.Idx) - self.que[i].Push(pkt) -} - -func (self *Normalizer) removeBeforeTime(tm time.Duration) { - for _, que := range self.que { - for que.Count() > 0 { - if que.Head().Time < tm { - que.Pop() - } - } - } } func (self *Normalizer) Pop() (pkt av.Packet, dur time.Duration, ok bool) { - mintm := time.Duration(0) - minidx := -1 - for i, que := range self.que { - if que.Count() > 0 { - if minidx == -1 || que.Head().Time < mintm { - minidx = i - } - } - } - if minidx == -1 { - return + return +} + +func (self *Normalizer) Do(pkt av.Packet) (out []av.Packet) { + const MaxDiff = time.Millisecond*800 + const MaxCacheTime = time.Second*5 + + i := int(pkt.Idx) + que := &self.ques[i] + que.Push(pkt) + + if que.Tail().Time - que.Head().Time > MaxCacheTime { + que.Pop() } - que := self.que[minidx] - if que.Count() >= 2 { - maxgap, defdur := CorrectTimeParams(self.streams[pkt.Idx]) - ok = true - starttime := que.HeadIdx(0).Time - endtime := que.HeadIdx(1).Time - dur = endtime - starttime - pkt = que.Pop() - pkt.Time -= self.timediff - if dur > maxgap { - dur = defdur - endtime -= defdur - self.timediff += endtime - starttime - self.removeBeforeTime(endtime) + for { + ok := true + diff := time.Duration(0) + for i := range self.ques { + if que.Count() == 0 { + ok = false + break + } + tm := self.ques[i].Tail().Time + for j := 0; j < i; j++ { + v := tm - self.ques[j].Tail().Time + if v < 0 { + v = -v + } + if v > diff { + diff = v + } + } } - return + if !ok { + return + } + if diff > MaxDiff { + ok = false + } + } + + if debugNormalizer { + fmt.Println("normalizer: push", pkt.Idx, pkt.Time) } return @@ -65,11 +68,7 @@ func (self *Normalizer) Pop() (pkt av.Packet, dur time.Duration, ok bool) { func NewNormalizer(streams []av.CodecData) *Normalizer { self := &Normalizer{} - self.que = make([]*Queue, len(streams)) - for i := range self.que { - self.que[i] = &Queue{} - } - self.timecr = NewTimeCorrector(streams) + self.ques = make(Queues, len(streams)) self.streams = streams return self } diff --git a/pktque/queue.go b/pktque/queue.go index 076cac5..00f54f8 100644 --- a/pktque/queue.go +++ b/pktque/queue.go @@ -2,10 +2,11 @@ package pktque import ( "github.com/nareix/av" + "time" ) // time -// 0 ----- 5 ----- 10 +// --------------------> // // V-A-V-V-A-V-V-A-V-V // | | @@ -17,15 +18,31 @@ type Queue struct { head int tail int n int + size int } func (self *Queue) Push(pkt av.Packet) { - if self.n == len(self.buf) { - self.buf = append(self.buf, pkt) + if self.size == self.n { + newsize := 0 + if self.size == 0 { + newsize = 8 + } else { + newsize = self.size*2 + } + newbuf := make([]av.Packet, newsize) + for i := 0; i < self.n; i++ { + j := (self.head+i)%self.size + newbuf[i] = self.buf[j] + } + newbuf[self.n] = pkt self.n++ + self.buf = newbuf + self.size = newsize + self.head = 0 + self.tail = self.n } else { self.buf[self.tail] = pkt - self.tail = (self.tail+1)%len(self.buf) + self.tail = (self.tail+1)%self.size self.n++ } } @@ -35,7 +52,7 @@ func (self *Queue) Pop() (pkt av.Packet) { return } pkt = self.buf[self.head] - self.head = (self.head+1)%len(self.buf) + self.head = (self.head+1)%self.size self.n-- return } @@ -45,16 +62,16 @@ func (self *Queue) Head() (pkt av.Packet) { } func (self *Queue) Tail() (pkt av.Packet) { - return self.buf[self.tail] + return self.buf[(self.tail-1+self.size)%self.size] } func (self *Queue) HeadIdx(diff int) (pkt av.Packet) { - i := (self.head+diff)%len(self.buf) + i := (self.head+diff)%self.size return self.buf[i] } func (self *Queue) TailIdx(diff int) (pkt av.Packet) { - i := (self.tail-diff+len(self.buf))%len(self.buf) + i := (self.tail-1-diff+self.size)%self.size return self.buf[i] } @@ -62,4 +79,33 @@ func (self *Queue) Count() int { return self.n } +type Queues []Queue + +func (self Queues) MinTimeIdx() (minidx int) { + mintm := time.Duration(0) + minidx = -1 + for i, que := range self { + if que.Count() > 0 { + headtm := que.Head().Time + if minidx == -1 || headtm < mintm { + minidx = i + mintm = headtm + } + } + } + return +} + +func (self Queues) RemoveBeforeTime(tm time.Duration) { + for i := range self { + que := &self[i] + for que.Count() > 0 { + if que.Head().Time < tm { + que.Pop() + } else { + break + } + } + } +} diff --git a/pktque/timecorrect.go b/pktque/timecorrect.go deleted file mode 100644 index 4ad6224..0000000 --- a/pktque/timecorrect.go +++ /dev/null @@ -1,75 +0,0 @@ -package pktque - -import ( - "github.com/nareix/av" - "time" -) - -type TimeCorrector struct { - streams []av.CodecData - intimes []time.Duration - indurs []time.Duration - intime time.Duration - outtime time.Duration -} - -func NewTimeCorrector(streams []av.CodecData) *TimeCorrector { - self := &TimeCorrector{} - self.intimes = make([]time.Duration, len(streams)) - self.indurs = make([]time.Duration, len(streams)) - self.streams = streams - return self -} - -func (self *TimeCorrector) updateIntimes() { - for i := range self.intimes { - self.intimes[i] = self.intime - } -} - -func (self *TimeCorrector) Correct(pkt *av.Packet) { - i := int(pkt.Idx) - - if pkt.Time < self.intime { - self.intime = pkt.Time - self.updateIntimes() - } else { - diff := pkt.Time - self.intimes[i] - maxgap, defdur := CorrectTimeParams(self.streams[i]) - - if diff > maxgap { - var outdiff time.Duration - dur := self.indurs[i] - if dur == time.Duration(0) { - dur = defdur - } - adjust := self.intimes[i]+dur - if adjust > self.intime { - outdiff = adjust - self.intime - } - self.outtime += outdiff - self.intime = pkt.Time - self.updateIntimes() - } else { - self.indurs[i] = pkt.Time-self.intimes[i] - self.intimes[i] = pkt.Time - self.outtime += pkt.Time-self.intime - self.intime = pkt.Time - } - } - - pkt.Time = self.outtime - return -} - -func CorrectTimeParams(stream av.CodecData) (maxgap time.Duration, dur time.Duration) { - if stream.Type().IsAudio() { - maxgap = time.Millisecond*500 - dur = time.Millisecond*20 - } else { - maxgap = time.Millisecond*500 - dur = time.Millisecond*20 - } - return -} -