diff --git a/transcode/transcode.go b/transcode/transcode.go index c7246d3..07ef1ae 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -1,31 +1,33 @@ package transcode import ( - "github.com/nareix/av" - "github.com/nareix/av/pktreorder" "fmt" + "time" + "github.com/nareix/av" + "github.com/nareix/av/pktque" ) -const debug = false +const debug = true -type tstream struct { - av.CodecData +type tStream struct { + codec av.CodecData + timeline *pktque.Timeline + aencodec, adecodec av.AudioCodecData aenc av.AudioEncoder adec av.AudioDecoder } type Transcoder struct { FindAudioDecoderEncoder func(codec av.AudioCodecData) (ok bool, err error, dec av.AudioDecoder, enc av.AudioEncoder) - streams []*tstream - queue *pktreorder.Queue + streams []*tStream } func (self *Transcoder) Setup(streams []av.CodecData) (err error) { - self.streams = []*tstream{} + self.streams = []*tStream{} for _, stream := range streams { - ts := &tstream{CodecData: stream} - if stream.IsAudio() { + ts := &tStream{codec: stream} + if stream.Type().IsAudio() { if self.FindAudioDecoderEncoder != nil { var ok bool var enc av.AudioEncoder @@ -35,7 +37,10 @@ func (self *Transcoder) Setup(streams []av.CodecData) (err error) { if err != nil { return } - ts.CodecData = enc.CodecData() + ts.timeline = &pktque.Timeline{} + ts.codec = enc.CodecData() + ts.aencodec = ts.codec.(av.AudioCodecData) + ts.adecodec = stream.(av.AudioCodecData) ts.aenc = enc ts.adec = dec } @@ -43,74 +48,67 @@ func (self *Transcoder) Setup(streams []av.CodecData) (err error) { } self.streams = append(self.streams, ts) } - - self.queue = &pktreorder.Queue{} - - var newstreams []av.CodecData - newstreams, _ = self.Streams() - self.queue.Alloc(newstreams) return } -func (self *Transcoder) decodeAndEncode(stream *tstream, i int, pkt av.Packet) (err error) { +func (self *tStream) audioDecodeAndEncode(inpkt av.Packet) (outpkts []av.Packet, err error) { + var dur time.Duration var frame av.AudioFrame var ok bool - if ok, frame, err = stream.adec.Decode(pkt.Data); err != nil { + if ok, frame, err = self.adec.Decode(inpkt.Data); err != nil { return } - if ok { - var pkts []av.Packet - if pkts, err = stream.aenc.Encode(frame); err != nil { + if !ok { + return + } + + if dur, err = self.adecodec.PacketDuration(inpkt.Data); err != nil { + err = fmt.Errorf("transcode: PacketDuration() failed for input stream #%d", inpkt.Idx) + return + } + + if debug { + fmt.Println("transcode: push", inpkt.Time, dur) + } + self.timeline.Push(inpkt.Time, dur) + + var _outpkts [][]byte + if _outpkts, err = self.aenc.Encode(frame); err != nil { + return + } + for _, _outpkt := range _outpkts { + if dur, err = self.aencodec.PacketDuration(_outpkt); err != nil { + err = fmt.Errorf("transcode: PacketDuration() failed for output stream #%d", inpkt.Idx) return } - for _, pkt := range pkts { - self.queue.WritePacket(i, pkt) + outpkt := av.Packet{Idx: inpkt.Idx, Data: _outpkt} + outpkt.Time = self.timeline.Pop(dur) + + if debug { + fmt.Println("transcode: pop", outpkt.Time) } + + outpkts = append(outpkts, outpkt) } + return } -func (self *Transcoder) WritePacket(i int, pkt av.Packet) { - if debug { - fmt.Println("transcode: Transcoder.WritePacket", i, len(pkt.Data), fmt.Sprintf("%.2f", pkt.Duration)) - fmt.Println("transcode: Transcoder.CanReadPacket", self.CanReadPacket()) - } - - stream := self.streams[i] +func (self *Transcoder) Do(pkt av.Packet) (out []av.Packet, err error) { + stream := self.streams[pkt.Idx] if stream.aenc != nil && stream.adec != nil { - if err := self.decodeAndEncode(stream, i, pkt); err != nil { - self.queue.EndWritePacket(err) + if out, err = stream.audioDecodeAndEncode(pkt); err != nil { + return } } else { - self.queue.WritePacket(i, pkt) + out = append(out, pkt) } - return } -func (self *Transcoder) EndWritePacket(err error) { - self.queue.EndWritePacket(err) -} - -func (self *Transcoder) CanReadPacket() bool { - return self.queue.CanReadPacket() -} - -func (self *Transcoder) CanWritePacket() bool { - return self.queue.CanWritePacket() -} - -func (self *Transcoder) Error() error { - return self.queue.Error() -} - -func (self *Transcoder) ReadPacket() (i int, pkt av.Packet, err error) { - return self.queue.ReadPacket() -} - func (self *Transcoder) Streams() (streams []av.CodecData, err error) { for _, stream := range self.streams { - streams = append(streams, stream.CodecData) + streams = append(streams, stream.codec) } return } @@ -119,16 +117,18 @@ func (self *Transcoder) Close() { for _, stream := range self.streams { if stream.aenc != nil { stream.aenc.Close() + stream.aenc = nil } if stream.adec != nil { stream.adec.Close() + stream.adec = nil } } - self.streams = []*tstream{} + self.streams = []*tStream{} } type Muxer struct { - Muxer av.Muxer + Muxer av.Muxer Transcoder *Transcoder } @@ -146,33 +146,21 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) { return } -func (self *Muxer) WritePacket(i int, pkt av.Packet) (err error) { - self.Transcoder.WritePacket(i, pkt) - if self.Transcoder.CanReadPacket() { - if i, pkt, rerr := self.Transcoder.ReadPacket(); rerr != nil { - err = rerr +func (self *Muxer) WritePacket(pkt av.Packet) (err error) { + var outpkts []av.Packet + if outpkts, err = self.Transcoder.Do(pkt); err != nil { + return + } + for _, pkt := range outpkts { + if err = self.Muxer.WritePacket(pkt); err != nil { return - } else { - if werr := self.Muxer.WritePacket(i, pkt); werr != nil { - self.Transcoder.EndWritePacket(werr) - } } } return } func (self *Muxer) WriteTrailer() (err error) { - self.Transcoder.EndWritePacket(nil) - for { - if i, pkt, rerr := self.Transcoder.ReadPacket(); rerr != nil { - break - } else { - if werr := self.Muxer.WritePacket(i, pkt); werr != nil { - err = werr - return - } - } - } + // TODO: do flush if err = self.Muxer.WriteTrailer(); err != nil { return } @@ -180,8 +168,9 @@ func (self *Muxer) WriteTrailer() (err error) { } type Demuxer struct { - Demuxer av.Demuxer + Demuxer av.Demuxer Transcoder *Transcoder + outpkts []av.Packet } func (self *Demuxer) Setup() (err error) { @@ -195,21 +184,22 @@ func (self *Demuxer) Setup() (err error) { return } -func (self *Demuxer) ReadPacket() (i int, pkt av.Packet, err error) { +func (self *Demuxer) ReadPacket() (pkt av.Packet, err error) { for { - if self.Transcoder.CanReadPacket() { - return self.Transcoder.ReadPacket() - } else if self.Transcoder.CanWritePacket() { - if i, pkt, err := self.Demuxer.ReadPacket(); err != nil { - self.Transcoder.EndWritePacket(err) - } else { - self.Transcoder.WritePacket(i, pkt) - } - } else { - err = self.Transcoder.Error() + if len(self.outpkts) > 0 { + pkt = self.outpkts[0] + self.outpkts = self.outpkts[1:] + return + } + var rpkt av.Packet + if rpkt, err = self.Demuxer.ReadPacket(); err != nil { + return + } + if self.outpkts, err = self.Transcoder.Do(rpkt); err != nil { return } } + return } func (self *Demuxer) Streams() ([]av.CodecData, error) { @@ -219,4 +209,3 @@ func (self *Demuxer) Streams() ([]av.CodecData, error) { func (self *Demuxer) Close() { self.Transcoder.Close() } -