improve transcode with timeline

This commit is contained in:
nareix 2016-06-22 19:19:48 +08:00
parent 1b301bfff2
commit f065582479

View File

@ -1,31 +1,33 @@
package transcode package transcode
import ( import (
"github.com/nareix/av"
"github.com/nareix/av/pktreorder"
"fmt" "fmt"
"time"
"github.com/nareix/av"
"github.com/nareix/av/pktque"
) )
const debug = false const debug = true
type tstream struct { type tStream struct {
av.CodecData codec av.CodecData
timeline *pktque.Timeline
aencodec, adecodec av.AudioCodecData
aenc av.AudioEncoder aenc av.AudioEncoder
adec av.AudioDecoder adec av.AudioDecoder
} }
type Transcoder struct { type Transcoder struct {
FindAudioDecoderEncoder func(codec av.AudioCodecData) (ok bool, err error, dec av.AudioDecoder, enc av.AudioEncoder) FindAudioDecoderEncoder func(codec av.AudioCodecData) (ok bool, err error, dec av.AudioDecoder, enc av.AudioEncoder)
streams []*tstream streams []*tStream
queue *pktreorder.Queue
} }
func (self *Transcoder) Setup(streams []av.CodecData) (err error) { func (self *Transcoder) Setup(streams []av.CodecData) (err error) {
self.streams = []*tstream{} self.streams = []*tStream{}
for _, stream := range streams { for _, stream := range streams {
ts := &tstream{CodecData: stream} ts := &tStream{codec: stream}
if stream.IsAudio() { if stream.Type().IsAudio() {
if self.FindAudioDecoderEncoder != nil { if self.FindAudioDecoderEncoder != nil {
var ok bool var ok bool
var enc av.AudioEncoder var enc av.AudioEncoder
@ -35,7 +37,10 @@ func (self *Transcoder) Setup(streams []av.CodecData) (err error) {
if err != nil { if err != nil {
return return
} }
ts.CodecData = enc.CodecData() ts.timeline = &pktque.Timeline{}
ts.codec = enc.CodecData()
ts.aencodec = ts.codec.(av.AudioCodecData)
ts.adecodec = stream.(av.AudioCodecData)
ts.aenc = enc ts.aenc = enc
ts.adec = dec ts.adec = dec
} }
@ -43,74 +48,67 @@ func (self *Transcoder) Setup(streams []av.CodecData) (err error) {
} }
self.streams = append(self.streams, ts) self.streams = append(self.streams, ts)
} }
self.queue = &pktreorder.Queue{}
var newstreams []av.CodecData
newstreams, _ = self.Streams()
self.queue.Alloc(newstreams)
return return
} }
func (self *Transcoder) decodeAndEncode(stream *tstream, i int, pkt av.Packet) (err error) { func (self *tStream) audioDecodeAndEncode(inpkt av.Packet) (outpkts []av.Packet, err error) {
var dur time.Duration
var frame av.AudioFrame var frame av.AudioFrame
var ok bool var ok bool
if ok, frame, err = stream.adec.Decode(pkt.Data); err != nil { if ok, frame, err = self.adec.Decode(inpkt.Data); err != nil {
return return
} }
if ok { if !ok {
var pkts []av.Packet return
if pkts, err = stream.aenc.Encode(frame); err != nil { }
if dur, err = self.adecodec.PacketDuration(inpkt.Data); err != nil {
err = fmt.Errorf("transcode: PacketDuration() failed for input stream #%d", inpkt.Idx)
return
}
if debug {
fmt.Println("transcode: push", inpkt.Time, dur)
}
self.timeline.Push(inpkt.Time, dur)
var _outpkts [][]byte
if _outpkts, err = self.aenc.Encode(frame); err != nil {
return
}
for _, _outpkt := range _outpkts {
if dur, err = self.aencodec.PacketDuration(_outpkt); err != nil {
err = fmt.Errorf("transcode: PacketDuration() failed for output stream #%d", inpkt.Idx)
return return
} }
for _, pkt := range pkts { outpkt := av.Packet{Idx: inpkt.Idx, Data: _outpkt}
self.queue.WritePacket(i, pkt) outpkt.Time = self.timeline.Pop(dur)
if debug {
fmt.Println("transcode: pop", outpkt.Time)
} }
outpkts = append(outpkts, outpkt)
} }
return return
} }
func (self *Transcoder) WritePacket(i int, pkt av.Packet) { func (self *Transcoder) Do(pkt av.Packet) (out []av.Packet, err error) {
if debug { stream := self.streams[pkt.Idx]
fmt.Println("transcode: Transcoder.WritePacket", i, len(pkt.Data), fmt.Sprintf("%.2f", pkt.Duration))
fmt.Println("transcode: Transcoder.CanReadPacket", self.CanReadPacket())
}
stream := self.streams[i]
if stream.aenc != nil && stream.adec != nil { if stream.aenc != nil && stream.adec != nil {
if err := self.decodeAndEncode(stream, i, pkt); err != nil { if out, err = stream.audioDecodeAndEncode(pkt); err != nil {
self.queue.EndWritePacket(err) return
} }
} else { } else {
self.queue.WritePacket(i, pkt) out = append(out, pkt)
} }
return return
} }
func (self *Transcoder) EndWritePacket(err error) {
self.queue.EndWritePacket(err)
}
func (self *Transcoder) CanReadPacket() bool {
return self.queue.CanReadPacket()
}
func (self *Transcoder) CanWritePacket() bool {
return self.queue.CanWritePacket()
}
func (self *Transcoder) Error() error {
return self.queue.Error()
}
func (self *Transcoder) ReadPacket() (i int, pkt av.Packet, err error) {
return self.queue.ReadPacket()
}
func (self *Transcoder) Streams() (streams []av.CodecData, err error) { func (self *Transcoder) Streams() (streams []av.CodecData, err error) {
for _, stream := range self.streams { for _, stream := range self.streams {
streams = append(streams, stream.CodecData) streams = append(streams, stream.codec)
} }
return return
} }
@ -119,16 +117,18 @@ func (self *Transcoder) Close() {
for _, stream := range self.streams { for _, stream := range self.streams {
if stream.aenc != nil { if stream.aenc != nil {
stream.aenc.Close() stream.aenc.Close()
stream.aenc = nil
} }
if stream.adec != nil { if stream.adec != nil {
stream.adec.Close() stream.adec.Close()
stream.adec = nil
} }
} }
self.streams = []*tstream{} self.streams = []*tStream{}
} }
type Muxer struct { type Muxer struct {
Muxer av.Muxer Muxer av.Muxer
Transcoder *Transcoder Transcoder *Transcoder
} }
@ -146,33 +146,21 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) {
return return
} }
func (self *Muxer) WritePacket(i int, pkt av.Packet) (err error) { func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
self.Transcoder.WritePacket(i, pkt) var outpkts []av.Packet
if self.Transcoder.CanReadPacket() { if outpkts, err = self.Transcoder.Do(pkt); err != nil {
if i, pkt, rerr := self.Transcoder.ReadPacket(); rerr != nil { return
err = rerr }
for _, pkt := range outpkts {
if err = self.Muxer.WritePacket(pkt); err != nil {
return return
} else {
if werr := self.Muxer.WritePacket(i, pkt); werr != nil {
self.Transcoder.EndWritePacket(werr)
}
} }
} }
return return
} }
func (self *Muxer) WriteTrailer() (err error) { func (self *Muxer) WriteTrailer() (err error) {
self.Transcoder.EndWritePacket(nil) // TODO: do flush
for {
if i, pkt, rerr := self.Transcoder.ReadPacket(); rerr != nil {
break
} else {
if werr := self.Muxer.WritePacket(i, pkt); werr != nil {
err = werr
return
}
}
}
if err = self.Muxer.WriteTrailer(); err != nil { if err = self.Muxer.WriteTrailer(); err != nil {
return return
} }
@ -180,8 +168,9 @@ func (self *Muxer) WriteTrailer() (err error) {
} }
type Demuxer struct { type Demuxer struct {
Demuxer av.Demuxer Demuxer av.Demuxer
Transcoder *Transcoder Transcoder *Transcoder
outpkts []av.Packet
} }
func (self *Demuxer) Setup() (err error) { func (self *Demuxer) Setup() (err error) {
@ -195,21 +184,22 @@ func (self *Demuxer) Setup() (err error) {
return return
} }
func (self *Demuxer) ReadPacket() (i int, pkt av.Packet, err error) { func (self *Demuxer) ReadPacket() (pkt av.Packet, err error) {
for { for {
if self.Transcoder.CanReadPacket() { if len(self.outpkts) > 0 {
return self.Transcoder.ReadPacket() pkt = self.outpkts[0]
} else if self.Transcoder.CanWritePacket() { self.outpkts = self.outpkts[1:]
if i, pkt, err := self.Demuxer.ReadPacket(); err != nil { return
self.Transcoder.EndWritePacket(err) }
} else { var rpkt av.Packet
self.Transcoder.WritePacket(i, pkt) if rpkt, err = self.Demuxer.ReadPacket(); err != nil {
} return
} else { }
err = self.Transcoder.Error() if self.outpkts, err = self.Transcoder.Do(rpkt); err != nil {
return return
} }
} }
return
} }
func (self *Demuxer) Streams() ([]av.CodecData, error) { func (self *Demuxer) Streams() ([]av.CodecData, error) {
@ -219,4 +209,3 @@ func (self *Demuxer) Streams() ([]av.CodecData, error) {
func (self *Demuxer) Close() { func (self *Demuxer) Close() {
self.Transcoder.Close() self.Transcoder.Close()
} }