diff --git a/pktreorder/pktreorder.go b/pktreorder/pktreorder.go new file mode 100644 index 0000000..60830a7 --- /dev/null +++ b/pktreorder/pktreorder.go @@ -0,0 +1,98 @@ +package pktreorder + +import ( + "github.com/nareix/av" + "io" +) + +type stream struct { + isVideo bool + pkts []av.Packet + pos float64 +} + +type Queue struct { + streams []*stream + pktnr int + err error +} + +func (self *Queue) Alloc(streams []av.CodecData) { + self.streams = []*stream{} + self.pktnr = 0 + for _, s := range streams { + self.streams = append(self.streams, &stream{isVideo: s.IsVideo()}) + } +} + +func (self *Queue) chooseStream() (chosen int) { + flush := self.err != nil + minpos := float64(-1) + chosen = -1 + + for i, stream := range self.streams { + if (minpos < 0 || stream.pos < minpos || stream.pos == minpos && stream.isVideo) && + (!flush || flush && len(stream.pkts) > 0) { + minpos = stream.pos + chosen = i + } + } + + return +} + +func (self *Queue) ReadPacket() (i int, pkt av.Packet, err error) { + if self.pktnr == 0 { + if self.err != nil { + err = self.err + } else { + err = io.EOF + } + return + } + + chosen := self.chooseStream() + if chosen < 0 { + err = io.EOF + return + } + stream := self.streams[chosen] + if len(stream.pkts) == 0 { + err = io.EOF + return + } + + i = chosen + pkt = stream.pkts[0] + stream.pkts = stream.pkts[1:] + stream.pos += pkt.Duration + self.pktnr-- + + return +} + +func (self *Queue) WritePacket(i int, pkt av.Packet) (err error) { + stream := self.streams[i] + stream.pkts = append(stream.pkts, pkt) + self.pktnr++ + return +} + +func (self *Queue) CanReadPacket() bool { + chosen := self.chooseStream() + if chosen < 0 { + return false + } + if len(self.streams[chosen].pkts) == 0 { + return false + } + return true +} + +func (self *Queue) EndWritePacket(err error) { + if err == nil { + err = io.EOF + } + self.err = err +} + diff --git a/pktreorder/pktreorder_test.go b/pktreorder/pktreorder_test.go new file mode 100644 index 0000000..60f8fc2 --- /dev/null +++ b/pktreorder/pktreorder_test.go @@ -0,0 +1,102 @@ +package pktreorder + +import ( + "github.com/nareix/av" + "fmt" +) + +type fakeStream struct { + isVideo bool + type_ int +} + +func (self fakeStream) IsVideo() bool { + return self.isVideo +} + +func (self fakeStream) IsAudio() bool { + return !self.isVideo +} + +func (self fakeStream) Type() int { + return self.type_ +} + +func ExampleQueue() { + var streams []av.CodecData + streams = append(streams, fakeStream{isVideo: true}) + streams = append(streams, fakeStream{isVideo: false}) + + queue := &Queue{} + queue.Alloc(streams) + var i int + var err error + + /* + Output: +false +false +true +0 true +1 true +1 true +1 true +0 false +0.30 +1 true +0 true +1 true +1 true +0 true +0 true +0 false +0 false + */ + + fmt.Println(queue.CanReadPacket()) + queue.WritePacket(1, av.Packet{Duration: 0.1}) + queue.WritePacket(1, av.Packet{Duration: 0.1}) + queue.WritePacket(1, av.Packet{Duration: 0.1}) + fmt.Println(queue.CanReadPacket()) + + queue.WritePacket(0, av.Packet{Duration: 1.0}) + queue.WritePacket(0, av.Packet{Duration: 1.0}) + fmt.Println(queue.CanReadPacket()) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + + queue.WritePacket(1, av.Packet{Duration: 0.8}) + fmt.Println(fmt.Sprintf("%.2f", queue.streams[1].pos)) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + + queue.WritePacket(0, av.Packet{Duration: 0.1}) + queue.WritePacket(1, av.Packet{Duration: 0.1}) + queue.WritePacket(0, av.Packet{Duration: 0.1}) + queue.WritePacket(1, av.Packet{Duration: 0.1}) + queue.EndWritePacket(nil) + + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) + i, _, err = queue.ReadPacket() + fmt.Println(i, err == nil) +} +