change to new av.Packet

This commit is contained in:
nareix 2016-06-22 17:59:12 +08:00
parent 5815ff6efa
commit 50b272ba76
3 changed files with 41 additions and 46 deletions

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/nareix/av" "github.com/nareix/av"
"github.com/nareix/av/pktqueue"
"github.com/nareix/codec/aacparser" "github.com/nareix/codec/aacparser"
"github.com/nareix/codec/h264parser" "github.com/nareix/codec/h264parser"
"io" "io"
@ -25,10 +24,12 @@ type Demuxer struct {
R io.Reader R io.Reader
gotpkt bool gotpkt bool
pktque *pktqueue.Queue pkt av.Packet
pat PAT pat PAT
pmt *PMT pmt *PMT
streams []*Stream streams []*Stream
streamsintf []av.CodecData
} }
func (self *Demuxer) Streams() (streams []av.CodecData, err error) { func (self *Demuxer) Streams() (streams []av.CodecData, err error) {
@ -37,21 +38,12 @@ func (self *Demuxer) Streams() (streams []av.CodecData, err error) {
return return
} }
for _, stream := range self.streams { for _, stream := range self.streams {
streams = append(streams, stream) streams = append(streams, stream.CodecData)
}
return
}
func (self *Demuxer) CurrentTime() (tm time.Duration) {
if self.pktque != nil {
tm = self.pktque.CurrentTime()
} }
return return
} }
func (self *Demuxer) ReadHeader() (err error) { func (self *Demuxer) ReadHeader() (err error) {
self.streams = []*Stream{}
for { for {
if self.pmt != nil { if self.pmt != nil {
n := 0 n := 0
@ -68,12 +60,15 @@ func (self *Demuxer) ReadHeader() (err error) {
return return
} }
} }
return return
} }
func (self *Demuxer) ReadPacket() (i int, pkt av.Packet, err error) { func (self *Demuxer) ReadPacket() (pkt av.Packet, err error) {
return self.pktque.ReadPacket() if err = self.poll(); err != nil {
return
}
pkt = self.pkt
return
} }
func (self *Demuxer) poll() (err error) { func (self *Demuxer) poll() (err error) {
@ -102,6 +97,8 @@ func (self *Demuxer) readTSPacket() (err error) {
} }
} else { } else {
if self.pmt == nil { if self.pmt == nil {
self.streams = []*Stream{}
for _, entry := range self.pat.Entries { for _, entry := range self.pat.Entries {
if entry.ProgramMapPID == header.PID { if entry.ProgramMapPID == header.PID {
self.pmt = new(PMT) self.pmt = new(PMT)
@ -120,17 +117,14 @@ func (self *Demuxer) readTSPacket() (err error) {
self.streams = append(self.streams, stream) self.streams = append(self.streams, stream)
} }
} }
self.pktque = &pktqueue.Queue{}
var streams []av.CodecData
if streams, err = self.Streams(); err != nil {
return
}
self.pktque.Alloc(streams)
self.pktque.Poll = self.poll
} }
} }
self.streamsintf = make([]av.CodecData, len(self.streams))
for i, stream := range self.streams {
self.streamsintf[i] = stream
}
} else { } else {
for _, stream := range self.streams { for _, stream := range self.streams {
if header.PID == stream.pid { if header.PID == stream.pid {
@ -155,15 +149,15 @@ func (self *Stream) payloadEnd() (err error) {
dts = pts dts = pts
} }
pkt := av.Packet{ self.pkt = av.Packet{
Idx: int8(self.idx),
IsKeyFrame: self.tshdr.RandomAccessIndicator, IsKeyFrame: self.tshdr.RandomAccessIndicator,
Time: time.Duration(dts)*time.Second / time.Duration(PTS_HZ),
Data: payload, Data: payload,
} }
tm := time.Duration(dts)*time.Second / time.Duration(PTS_HZ)
if pts != dts { if pts != dts {
pkt.CompositionTime = time.Duration(pts-dts)*time.Second / time.Duration(PTS_HZ) self.pkt.CompositionTime = time.Duration(pts-dts)*time.Second / time.Duration(PTS_HZ)
} }
self.demuxer.pktque.WriteTimePacket(self.idx, tm, pkt)
self.demuxer.gotpkt = true self.demuxer.gotpkt = true
if self.CodecData == nil { if self.CodecData == nil {

View File

@ -135,14 +135,24 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) {
if err = self.WritePATPMT(); err != nil { if err = self.WritePATPMT(); err != nil {
return return
} }
return return
} }
func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) { func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
stream := self.streams[streamIndex] if true {
fmt.Println("ts:", "in", pkt.Idx, pkt.Time, "len", len(pkt.Data))
}
if err = self.writePacket(pkt); err != nil {
return
}
return
}
if stream.Type() == av.AAC { func (self *Muxer) writePacket(pkt av.Packet) (err error) {
stream := self.streams[pkt.Idx]
switch stream.Type() {
case av.AAC:
codec := stream.CodecData.(aacparser.CodecData) codec := stream.CodecData.(aacparser.CodecData)
data := pkt.Data data := pkt.Data
if !aacparser.IsADTSFrame(data) { if !aacparser.IsADTSFrame(data) {
@ -152,26 +162,24 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
pes := PESHeader{ pes := PESHeader{
StreamId: StreamIdAAC, StreamId: StreamIdAAC,
PTS: timeToPesTs(stream.time), PTS: timeToPesTs(pkt.Time),
} }
WritePESHeader(buf, pes, len(data)) WritePESHeader(buf, pes, len(data))
buf.Write(data) buf.Write(data)
stream.tsw.RandomAccessIndicator = true stream.tsw.RandomAccessIndicator = true
stream.tsw.PCR = timeToPCR(stream.time) stream.tsw.PCR = timeToPCR(pkt.Time)
if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil { if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil {
return return
} }
stream.time += pkt.Duration case av.H264:
} else if stream.Type() == av.H264 {
codec := stream.CodecData.(h264parser.CodecData) codec := stream.CodecData.(h264parser.CodecData)
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
pes := PESHeader{ pes := PESHeader{
StreamId: StreamIdH264, StreamId: StreamIdH264,
PTS: timeToPesTs(stream.time + pkt.CompositionTime), PTS: timeToPesTs(pkt.Time + pkt.CompositionTime),
DTS: timeToPesTs(stream.time), DTS: timeToPesTs(pkt.Time),
} }
WritePESHeader(buf, pes, 0) WritePESHeader(buf, pes, 0)
@ -184,16 +192,10 @@ func (self *Muxer) WritePacket(streamIndex int, pkt av.Packet) (err error) {
}) })
stream.tsw.RandomAccessIndicator = pkt.IsKeyFrame stream.tsw.RandomAccessIndicator = pkt.IsKeyFrame
stream.tsw.PCR = timeToPCR(stream.time) stream.tsw.PCR = timeToPCR(pkt.Time)
if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil { if err = stream.tsw.WriteTo(self.W, buf.Bytes()); err != nil {
return return
} }
stream.time += pkt.Duration
} else {
err = fmt.Errorf("unknown stream type=%d", stream.Type())
return
} }
return return

View File

@ -26,7 +26,6 @@ type Stream struct {
idx int idx int
pkt av.Packet pkt av.Packet
time time.Duration
} }
func timeToPesTs(tm time.Duration) uint64 { func timeToPesTs(tm time.Duration) uint64 {