adjust newapi

This commit is contained in:
nareix 2016-05-25 07:44:59 +08:00
parent 9767e93332
commit d3ba527309
3 changed files with 84 additions and 137 deletions

View File

@ -5,37 +5,43 @@ import (
"fmt" "fmt"
"encoding/hex" "encoding/hex"
"github.com/nareix/av" "github.com/nareix/av"
"github.com/nareix/av/pktqueue"
"github.com/nareix/codec/aacparser" "github.com/nareix/codec/aacparser"
"github.com/nareix/codec/h264parser" "github.com/nareix/codec/h264parser"
"io" "io"
) )
func Open(R io.Reader) (demuxer *Demuxer, err error) {
_demuxer := &Demuxer{R: R}
if err = _demuxer.ReadHeader(); err != nil {
return
}
demuxer = _demuxer
return
}
type Demuxer struct { type Demuxer struct {
R io.Reader R io.Reader
gotpkt bool
pktque *pktqueue.Queue
pat PAT pat PAT
pmt *PMT pmt *PMT
streams []*Stream streams []*Stream
time float64
readErr error
} }
// ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool) func (self *Demuxer) Streams() (streams []av.CodecData) {
// WritePayload(pid, pts, dts, isKeyFrame, payloads, isVideoFrame)
func (self *Demuxer) Streams() (streams []av.Stream) {
for _, stream := range self.streams { for _, stream := range self.streams {
streams = append(streams, stream) streams = append(streams, stream)
} }
return return
} }
func (self *Demuxer) Time() float64 { func (self *Demuxer) CurrentTime() (time float64) {
if len(self.streams) > 0 { if self.pktque != nil {
return self.streams[0].time time = self.pktque.CurrentTime()
} }
return 0.0 return
} }
func (self *Demuxer) ReadHeader() (err error) { func (self *Demuxer) ReadHeader() (err error) {
@ -45,7 +51,7 @@ func (self *Demuxer) ReadHeader() (err error) {
if self.pmt != nil { if self.pmt != nil {
n := 0 n := 0
for _, stream := range self.streams { for _, stream := range self.streams {
if len(stream.CodecData()) > 0 { if stream.CodecData != nil {
n++ n++
} }
} }
@ -53,7 +59,7 @@ func (self *Demuxer) ReadHeader() (err error) {
break break
} }
} }
if err = self.readPacket(); err != nil { if err = self.poll(); err != nil {
return return
} }
} }
@ -61,51 +67,21 @@ func (self *Demuxer) ReadHeader() (err error) {
return return
} }
func (self *Demuxer) ReadPacket() (streamIndex int, pkt av.Packet, err error) { func (self *Demuxer) ReadPacket() (i int, pkt av.Packet, err error) {
if len(self.streams) == 0 { return self.pktque.ReadPacket()
err = fmt.Errorf("no stream")
return
}
for {
if self.readErr != nil {
if false {
for _, stream := range self.streams {
fmt.Println("read(flush): stream", stream.Type(), "pkts", len(stream.pkts))
}
}
for i, stream := range self.streams {
var ok bool
if pkt, ok = stream.readLastPacket(); ok {
streamIndex = i
return
}
}
err = self.readErr
return
} else {
if false {
for _, stream := range self.streams {
fmt.Println("read(normal): stream", stream.Type(), "pkts", len(stream.pkts))
}
}
for i, stream := range self.streams {
var ok bool
if pkt, ok = stream.readPacket(); ok {
streamIndex = i
return
}
}
}
if self.readErr == nil {
self.readErr = self.readPacket()
}
}
} }
func (self *Demuxer) readPacket() (err error) { func (self *Demuxer) poll() (err error) {
for self.gotpkt {
if err = self.readTSPacket(); err != nil {
return
}
}
self.gotpkt = false
return
}
func (self *Demuxer) readTSPacket() (err error) {
var header TSHeader var header TSHeader
var n int var n int
var data [188]byte var data [188]byte
@ -127,20 +103,21 @@ func (self *Demuxer) readPacket() (err error) {
if *self.pmt, err = ReadPMT(bytes.NewReader(payload)); err != nil { if *self.pmt, err = ReadPMT(bytes.NewReader(payload)); err != nil {
return return
} }
for _, info := range self.pmt.ElementaryStreamInfos { for i, info := range self.pmt.ElementaryStreamInfos {
stream := &Stream{} stream := &Stream{}
stream.idx = i
stream.demuxer = self stream.demuxer = self
stream.pid = info.ElementaryPID stream.pid = info.ElementaryPID
switch info.StreamType { switch info.StreamType {
case ElementaryStreamTypeH264: case ElementaryStreamTypeH264:
stream.SetType(av.H264)
self.streams = append(self.streams, stream) self.streams = append(self.streams, stream)
case ElementaryStreamTypeAdtsAAC: case ElementaryStreamTypeAdtsAAC:
stream.SetType(av.AAC)
self.streams = append(self.streams, stream) self.streams = append(self.streams, stream)
} }
} }
self.pktque = &pktqueue.Queue{}
self.pktque.Alloc(len(self.streams))
self.pktque.Poll = self.poll
} }
} }
@ -159,68 +136,28 @@ func (self *Demuxer) readPacket() (err error) {
return return
} }
func (self *Stream) readLastPacket() (ret av.Packet, ok bool) { func (self *Stream) payloadEnd() (err error) {
if len(self.pkts) > 1 { payload := self.buf.Bytes()
return self.readPacket()
}
if len(self.pkts) == 1 {
pkt := self.pkts[0]
self.pkts = self.pkts[1:]
if self.peshdr.DataLength == 0 {
pkt.Data = self.buf.Bytes()
}
self.time += pkt.Duration
return pkt.Packet, true
}
return
}
func (self *Stream) readPacket() (ret av.Packet, ok bool) {
if len(self.pkts) > 1 {
pkt := self.pkts[0]
self.pkts = self.pkts[1:]
self.time += pkt.Duration
return pkt.Packet, true
}
return
}
func (self *Stream) payloadStart() {
dts := self.peshdr.DTS dts := self.peshdr.DTS
pts := self.peshdr.PTS pts := self.peshdr.PTS
if dts == 0 { if dts == 0 {
dts = pts dts = pts
} }
pkt := tsPacket{ pkt := av.Packet{
Packet: av.Packet{ IsKeyFrame: self.tshdr.RandomAccessIndicator,
IsKeyFrame: self.tshdr.RandomAccessIndicator, Data: payload,
},
time: float64(dts)/float64(PTS_HZ),
} }
time := float64(dts)/float64(PTS_HZ)
if pts != dts { if pts != dts {
pkt.CompositionTime = float64(pts-dts)/float64(PTS_HZ) pkt.CompositionTime = float64(pts-dts)/float64(PTS_HZ)
} }
self.demuxer.pktque.WriteTimePacket(self.idx, time, pkt)
self.demuxer.gotpkt = true
if len(self.pkts) > 0 { if self.CodecData == nil {
lastpkt := &self.pkts[len(self.pkts)-1] if self.streamType == ElementaryStreamTypeAdtsAAC {
lastpkt.Duration = pkt.time - lastpkt.time
self.lastDuration = lastpkt.Duration
} else {
pkt.Duration = self.lastDuration
}
self.pkts = append(self.pkts, pkt)
}
func (self *Stream) payloadEnd() (err error) {
payload := self.buf.Bytes()
curpkt := &self.pkts[len(self.pkts)-1]
curpkt.Data = payload
if len(self.CodecData()) == 0 {
if self.Type() == av.AAC {
var config aacparser.MPEG4AudioConfig var config aacparser.MPEG4AudioConfig
if config, _, _, _, err = aacparser.ReadADTSFrame(payload); err != nil { if config, _, _, _, err = aacparser.ReadADTSFrame(payload); err != nil {
err = fmt.Errorf("ReadADTSFrame failed: %s", err) err = fmt.Errorf("ReadADTSFrame failed: %s", err)
@ -231,11 +168,10 @@ func (self *Stream) payloadEnd() (err error) {
err = fmt.Errorf("WriteMPEG4AudioConfig failed: %s", err) err = fmt.Errorf("WriteMPEG4AudioConfig failed: %s", err)
return return
} }
if err = self.SetCodecData(bw.Bytes()); err != nil { if self.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(bw.Bytes()); err != nil {
err = fmt.Errorf("SetCodecData failed: %s", err)
return return
} }
} else if self.Type() == av.H264 { } else if self.streamType == ElementaryStreamTypeH264 {
if false { if false {
fmt.Println(hex.Dump(payload)) fmt.Println(hex.Dump(payload))
} }
@ -252,9 +188,7 @@ func (self *Stream) payloadEnd() (err error) {
} }
} }
if len(sps) > 0 && len(pps) > 0 { if len(sps) > 0 && len(pps) > 0 {
codecData, _ := h264parser.CreateCodecDataBySPSAndPPS(sps, pps) if self.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil {
if err = self.SetCodecData(codecData); err != nil {
err = fmt.Errorf("SetCodecData failed: %s", err)
return return
} }
} }
@ -280,7 +214,6 @@ func (self *Stream) handleTSPacket(header TSHeader, tspacket []byte) (err error)
return return
} }
self.tshdr = header self.tshdr = header
self.payloadStart()
} }
if _, err = io.CopyN(&self.buf, lr, lr.N); err != nil { if _, err = io.CopyN(&self.buf, lr, lr.N); err != nil {
@ -295,3 +228,4 @@ func (self *Stream) handleTSPacket(header TSHeader, tspacket []byte) (err error)
return return
} }

View File

@ -9,6 +9,15 @@ import (
"io" "io"
) )
func Create(W io.Writer, streams []av.CodecData) (muxer *Muxer, err error) {
_muxer := &Muxer{W: W}
if err = _muxer.WriteHeader(streams); err != nil {
return
}
muxer = _muxer
return
}
type Muxer struct { type Muxer struct {
W io.Writer W io.Writer
streams []*Stream streams []*Stream
@ -18,16 +27,17 @@ type Muxer struct {
tswPMT *TSWriter tswPMT *TSWriter
} }
func (self *Muxer) NewStream() av.Stream { func (self *Muxer) NewStream(codec av.CodecData) (err error) {
stream := &Stream{ stream := &Stream{
mux: self, muxer: self,
CodecData: codec,
tsw: &TSWriter{ tsw: &TSWriter{
DiscontinuityIndicator: true, DiscontinuityIndicator: true,
PID: uint(len(self.streams) + 0x100), PID: uint(len(self.streams) + 0x100),
}, },
} }
self.streams = append(self.streams, stream) self.streams = append(self.streams, stream)
return stream return
} }
func (self *Muxer) writePaddingTSPackets(tsw *TSWriter) (err error) { func (self *Muxer) writePaddingTSPackets(tsw *TSWriter) (err error) {
@ -55,7 +65,13 @@ func (self *Muxer) WriteTrailer() (err error) {
return return
} }
func (self *Muxer) WriteHeader() (err error) { func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) {
for _, stream := range streams {
if err = self.NewStream(stream); err != nil {
return
}
}
bufPAT := &bytes.Buffer{} bufPAT := &bytes.Buffer{}
bufPMT := &bytes.Buffer{} bufPMT := &bytes.Buffer{}
@ -104,9 +120,10 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) {
stream := self.streams[streamIndex] stream := self.streams[streamIndex]
if stream.Type() == av.AAC { if stream.Type() == av.AAC {
codec := stream.CodecData.(av.AACCodecData)
data := pkt.Data data := pkt.Data
if !aacparser.IsADTSFrame(data) { if !aacparser.IsADTSFrame(data) {
data = append(aacparser.MakeADTSHeader(stream.AACCodecInfo.MPEG4AudioConfig, 1024, len(data)), data...) data = append(codec.MakeADTSHeader(1024, len(data)), data...)
} }
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
@ -126,6 +143,7 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) {
stream.time += pkt.Duration stream.time += pkt.Duration
} else if stream.Type() == av.H264 { } else if stream.Type() == av.H264 {
codec := stream.CodecData.(av.H264CodecData)
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
pes := PESHeader{ pes := PESHeader{
StreamId: StreamIdH264, StreamId: StreamIdH264,
@ -136,9 +154,7 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) {
nalus, _ := h264parser.SplitNALUs(pkt.Data) nalus, _ := h264parser.SplitNALUs(pkt.Data)
if pkt.IsKeyFrame { if pkt.IsKeyFrame {
sps := stream.H264CodecInfo.Record.SPS[0] nalus = append([][]byte{codec.SPS(), codec.PPS()}, nalus...)
pps := stream.H264CodecInfo.Record.PPS[0]
nalus = append([][]byte{sps, pps}, nalus...)
} }
h264parser.WalkNALUsAnnexb(nalus, func(b []byte) { h264parser.WalkNALUsAnnexb(nalus, func(b []byte) {
buf.Write(b) buf.Write(b)

View File

@ -5,30 +5,27 @@ import (
"github.com/nareix/av" "github.com/nareix/av"
) )
type tsPacket struct {
av.Packet
time float64
}
type Stream struct { type Stream struct {
av.StreamCommon av.CodecData
time float64
lastDuration float64
pid uint pid uint
buf bytes.Buffer buf bytes.Buffer
peshdr *PESHeader peshdr *PESHeader
tshdr TSHeader tshdr TSHeader
pkts []tsPacket
demuxer *Demuxer demuxer *Demuxer
mux *Muxer muxer *Muxer
streamId uint streamId uint
streamType uint
tsw *TSWriter tsw *TSWriter
dataBuf *iovec dataBuf *iovec
cacheSize int cacheSize int
idx int
pkt av.Packet
time float64
} }
func timeToPesTs(time float64) uint64 { func timeToPesTs(time float64) uint64 {