diff --git a/pktque/normalizer.go b/pktque/normalizer.go new file mode 100644 index 0000000..4b7113b --- /dev/null +++ b/pktque/normalizer.go @@ -0,0 +1,76 @@ +package pktque + +import ( + "github.com/nareix/av" + "time" +) + +type Normalizer struct { + que []*Queue + timecr *TimeCorrector + streams []av.CodecData + timediff time.Duration +} + +func (self *Normalizer) Push(pkt av.Packet) { + self.timecr.Correct(&pkt) + i := int(pkt.Idx) + self.que[i].Push(pkt) +} + +func (self *Normalizer) removeBeforeTime(tm time.Duration) { + for _, que := range self.que { + for que.Count() > 0 { + if que.Head().Time < tm { + que.Pop() + } + } + } +} + +func (self *Normalizer) Pop() (pkt av.Packet, dur time.Duration, ok bool) { + mintm := time.Duration(0) + minidx := -1 + for i, que := range self.que { + if que.Count() > 0 { + if minidx == -1 || que.Head().Time < mintm { + minidx = i + } + } + } + if minidx == -1 { + return + } + + que := self.que[minidx] + if que.Count() >= 2 { + maxgap, defdur := CorrectTimeParams(self.streams[pkt.Idx]) + ok = true + starttime := que.HeadIdx(0).Time + endtime := que.HeadIdx(1).Time + dur = endtime - starttime + pkt = que.Pop() + pkt.Time -= self.timediff + if dur > maxgap { + dur = defdur + endtime -= defdur + self.timediff += endtime - starttime + self.removeBeforeTime(endtime) + } + return + } + + return +} + +func NewNormalizer(streams []av.CodecData) *Normalizer { + self := &Normalizer{} + self.que = make([]*Queue, len(streams)) + for i := range self.que { + self.que[i] = &Queue{} + } + self.timecr = NewTimeCorrector(streams) + self.streams = streams + return self +} + diff --git a/pktque/queue.go b/pktque/queue.go new file mode 100644 index 0000000..076cac5 --- /dev/null +++ b/pktque/queue.go @@ -0,0 +1,65 @@ +package pktque + +import ( + "github.com/nareix/av" +) + +// time +// 0 ----- 5 ----- 10 +// +// V-A-V-V-A-V-V-A-V-V +// | | +// head tail +// pop push + +type Queue struct { + buf []av.Packet + head int + tail int + n int +} + +func (self *Queue) Push(pkt av.Packet) { + if self.n == len(self.buf) { + self.buf = append(self.buf, pkt) + self.n++ + } else { + self.buf[self.tail] = pkt + self.tail = (self.tail+1)%len(self.buf) + self.n++ + } +} + +func (self *Queue) Pop() (pkt av.Packet) { + if self.n == 0 { + return + } + pkt = self.buf[self.head] + self.head = (self.head+1)%len(self.buf) + self.n-- + return +} + +func (self *Queue) Head() (pkt av.Packet) { + return self.buf[self.head] +} + +func (self *Queue) Tail() (pkt av.Packet) { + return self.buf[self.tail] +} + +func (self *Queue) HeadIdx(diff int) (pkt av.Packet) { + i := (self.head+diff)%len(self.buf) + return self.buf[i] +} + +func (self *Queue) TailIdx(diff int) (pkt av.Packet) { + i := (self.tail-diff+len(self.buf))%len(self.buf) + return self.buf[i] +} + +func (self *Queue) Count() int { + return self.n +} + + diff --git a/pktque/timecorrect.go b/pktque/timecorrect.go new file mode 100644 index 0000000..4ad6224 --- /dev/null +++ b/pktque/timecorrect.go @@ -0,0 +1,75 @@ +package pktque + +import ( + "github.com/nareix/av" + "time" +) + +type TimeCorrector struct { + streams []av.CodecData + intimes []time.Duration + indurs []time.Duration + intime time.Duration + outtime time.Duration +} + +func NewTimeCorrector(streams []av.CodecData) *TimeCorrector { + self := &TimeCorrector{} + self.intimes = make([]time.Duration, len(streams)) + self.indurs = make([]time.Duration, len(streams)) + self.streams = streams + return self +} + +func (self *TimeCorrector) updateIntimes() { + for i := range self.intimes { + self.intimes[i] = self.intime + } +} + +func (self *TimeCorrector) Correct(pkt *av.Packet) { + i := int(pkt.Idx) + + if pkt.Time < self.intime { + self.intime = pkt.Time + self.updateIntimes() + } else { + diff := pkt.Time - self.intimes[i] + maxgap, defdur := CorrectTimeParams(self.streams[i]) + + if diff > maxgap { + var outdiff time.Duration + dur := self.indurs[i] + if dur == time.Duration(0) { + dur = defdur + } + adjust := self.intimes[i]+dur + if adjust > self.intime { + outdiff = adjust - self.intime + } + self.outtime += outdiff + self.intime = pkt.Time + self.updateIntimes() + } else { + self.indurs[i] = pkt.Time-self.intimes[i] + self.intimes[i] = pkt.Time + self.outtime += pkt.Time-self.intime + self.intime = pkt.Time + } + } + + pkt.Time = self.outtime + return +} + +func CorrectTimeParams(stream av.CodecData) (maxgap time.Duration, dur time.Duration) { + if stream.Type().IsAudio() { + maxgap = time.Millisecond*500 + dur = time.Millisecond*20 + } else { + maxgap = time.Millisecond*500 + dur = time.Millisecond*20 + } + return +} +