change readPacket mechanism and add h264 probing
This commit is contained in:
parent
b1239fc37c
commit
12f2f28f55
185
demuxer.go
185
demuxer.go
@ -3,8 +3,10 @@ package ts
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"encoding/hex"
|
||||||
"github.com/nareix/av"
|
"github.com/nareix/av"
|
||||||
"github.com/nareix/codec/aacparser"
|
"github.com/nareix/codec/aacparser"
|
||||||
|
"github.com/nareix/codec/h264parser"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -14,6 +16,9 @@ type Demuxer struct {
|
|||||||
pat PAT
|
pat PAT
|
||||||
pmt *PMT
|
pmt *PMT
|
||||||
streams []*Stream
|
streams []*Stream
|
||||||
|
time float64
|
||||||
|
|
||||||
|
readErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool)
|
// ParsePacket() (pid uint, counter int, isStart bool, pts, dst int64, isKeyFrame bool)
|
||||||
@ -27,10 +32,8 @@ func (self *Demuxer) Streams() (streams []av.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (self *Demuxer) Time() float64 {
|
func (self *Demuxer) Time() float64 {
|
||||||
for _, stream := range self.streams {
|
if len(self.streams) > 0 {
|
||||||
if len(stream.pkts) > 0 {
|
return self.streams[0].time
|
||||||
return stream.pkts[len(stream.pkts)-1].time
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return 0.0
|
return 0.0
|
||||||
}
|
}
|
||||||
@ -50,7 +53,6 @@ func (self *Demuxer) ReadHeader() (err error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = self.readPacket(); err != nil {
|
if err = self.readPacket(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -66,17 +68,39 @@ func (self *Demuxer) ReadPacket() (streamIndex int, pkt av.Packet, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
for i, stream := range self.streams {
|
if self.readErr != nil {
|
||||||
if len(stream.pkts) > 1 {
|
if false {
|
||||||
streamIndex = i
|
for _, stream := range self.streams {
|
||||||
pkt = stream.pkts[0].Packet
|
fmt.Println("read(flush): stream", stream.Type(), "pkts", len(stream.pkts))
|
||||||
stream.pkts = stream.pkts[1:]
|
}
|
||||||
return
|
}
|
||||||
|
for i, stream := range self.streams {
|
||||||
|
var ok bool
|
||||||
|
if pkt, ok = stream.readLastPacket(); ok {
|
||||||
|
streamIndex = i
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = self.readErr
|
||||||
|
return
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if false {
|
||||||
|
for _, stream := range self.streams {
|
||||||
|
fmt.Println("read(normal): stream", stream.Type(), "pkts", len(stream.pkts))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i, stream := range self.streams {
|
||||||
|
var ok bool
|
||||||
|
if pkt, ok = stream.readPacket(); ok {
|
||||||
|
streamIndex = i
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = self.readPacket(); err != nil {
|
if self.readErr == nil {
|
||||||
return
|
self.readErr = self.readPacket()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -123,7 +147,7 @@ func (self *Demuxer) readPacket() (err error) {
|
|||||||
} else {
|
} else {
|
||||||
for _, stream := range self.streams {
|
for _, stream := range self.streams {
|
||||||
if header.PID == stream.pid {
|
if header.PID == stream.pid {
|
||||||
if err = stream.appendPacket(header, payload); err != nil {
|
if err = stream.handleTSPacket(header, payload); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -135,13 +159,78 @@ func (self *Demuxer) readPacket() (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Stream) appendPayload() (err error) {
|
func (self *Stream) readLastPacket() (ret av.Packet, ok bool) {
|
||||||
self.payload = self.buf.Bytes()
|
if len(self.pkts) > 1 {
|
||||||
|
return self.readPacket()
|
||||||
|
}
|
||||||
|
if len(self.pkts) == 1 {
|
||||||
|
pkt := self.pkts[0]
|
||||||
|
self.pkts = self.pkts[1:]
|
||||||
|
if self.peshdr.DataLength == 0 {
|
||||||
|
pkt.Data = self.buf.Bytes()
|
||||||
|
}
|
||||||
|
self.time += pkt.Duration
|
||||||
|
return pkt.Packet, true
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if self.Type() == av.AAC {
|
func (self *Stream) readPacket() (ret av.Packet, ok bool) {
|
||||||
if len(self.CodecData()) == 0 {
|
if len(self.pkts) > 1 {
|
||||||
|
pkt := self.pkts[0]
|
||||||
|
self.pkts = self.pkts[1:]
|
||||||
|
self.time += pkt.Duration
|
||||||
|
return pkt.Packet, true
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *Stream) payloadStart() {
|
||||||
|
if false {
|
||||||
|
fmt.Println("payloadStart:", self)
|
||||||
|
}
|
||||||
|
|
||||||
|
dts := self.peshdr.DTS
|
||||||
|
pts := self.peshdr.PTS
|
||||||
|
if dts == 0 {
|
||||||
|
dts = pts
|
||||||
|
}
|
||||||
|
|
||||||
|
pkt := tsPacket{
|
||||||
|
Packet: av.Packet{
|
||||||
|
IsKeyFrame: self.tshdr.RandomAccessIndicator,
|
||||||
|
},
|
||||||
|
time: float64(dts)/float64(PTS_HZ),
|
||||||
|
}
|
||||||
|
if pts != dts {
|
||||||
|
pkt.CompositionTime = float64(pts-dts)/float64(PTS_HZ)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(self.pkts) > 0 {
|
||||||
|
lastpkt := &self.pkts[len(self.pkts)-1]
|
||||||
|
lastpkt.Duration = pkt.time - lastpkt.time
|
||||||
|
self.lastDuration = lastpkt.Duration
|
||||||
|
} else {
|
||||||
|
pkt.Duration = self.lastDuration
|
||||||
|
}
|
||||||
|
|
||||||
|
self.pkts = append(self.pkts, pkt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *Stream) payloadEnd() (err error) {
|
||||||
|
if false {
|
||||||
|
fmt.Println("payloadEnd:", self)
|
||||||
|
}
|
||||||
|
|
||||||
|
payload := self.buf.Bytes()
|
||||||
|
|
||||||
|
curpkt := &self.pkts[len(self.pkts)-1]
|
||||||
|
curpkt.Data = payload
|
||||||
|
|
||||||
|
if len(self.CodecData()) == 0 {
|
||||||
|
if self.Type() == av.AAC {
|
||||||
var config aacparser.MPEG4AudioConfig
|
var config aacparser.MPEG4AudioConfig
|
||||||
if config, _, _, _, err = aacparser.ReadADTSFrame(self.payload); err != nil {
|
if config, _, _, _, err = aacparser.ReadADTSFrame(payload); err != nil {
|
||||||
err = fmt.Errorf("ReadADTSFrame failed: %s", err)
|
err = fmt.Errorf("ReadADTSFrame failed: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -154,42 +243,41 @@ func (self *Stream) appendPayload() (err error) {
|
|||||||
err = fmt.Errorf("SetCodecData failed: %s", err)
|
err = fmt.Errorf("SetCodecData failed: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
} else if self.Type() == av.H264 {
|
||||||
|
if false {
|
||||||
|
fmt.Println(hex.Dump(payload))
|
||||||
|
}
|
||||||
|
nalus, _ := h264parser.SplitNALUs(payload)
|
||||||
|
var sps, pps []byte
|
||||||
|
for _, nalu := range nalus {
|
||||||
|
if len(nalu) > 0 {
|
||||||
|
naltype := nalu[0]&0x1f
|
||||||
|
if naltype == 7 {
|
||||||
|
sps = nalu
|
||||||
|
} else if naltype == 8 {
|
||||||
|
pps = nalu
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(sps) > 0 && len(pps) > 0 {
|
||||||
|
codecData, _ := h264parser.CreateCodecDataBySPSAndPPS(sps, pps)
|
||||||
|
if err = self.SetCodecData(codecData); err != nil {
|
||||||
|
err = fmt.Errorf("SetCodecData failed: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dts := self.peshdr.DTS
|
|
||||||
pts := self.peshdr.PTS
|
|
||||||
if dts == 0 {
|
|
||||||
dts = pts
|
|
||||||
}
|
|
||||||
|
|
||||||
pkt := tsPacket{
|
|
||||||
Packet: av.Packet{
|
|
||||||
IsKeyFrame: self.tshdr.RandomAccessIndicator,
|
|
||||||
Data: self.payload,
|
|
||||||
},
|
|
||||||
time: float64(dts)/float64(PTS_HZ),
|
|
||||||
}
|
|
||||||
|
|
||||||
if pts != dts {
|
|
||||||
pkt.CompositionTime = float64(pts-dts)/float64(PTS_HZ)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(self.pkts) > 0 {
|
|
||||||
lastPkt := &self.pkts[len(self.pkts)-1]
|
|
||||||
lastPkt.Duration = pkt.time - lastPkt.time
|
|
||||||
}
|
|
||||||
self.pkts = append(self.pkts, pkt)
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Stream) appendPacket(header TSHeader, payload []byte) (err error) {
|
func (self *Stream) handleTSPacket(header TSHeader, tspacket []byte) (err error) {
|
||||||
r := bytes.NewReader(payload)
|
r := bytes.NewReader(tspacket)
|
||||||
lr := &io.LimitedReader{R: r, N: int64(len(payload))}
|
lr := &io.LimitedReader{R: r, N: int64(len(tspacket))}
|
||||||
|
|
||||||
if header.PayloadUnitStart && self.peshdr != nil && self.peshdr.DataLength == 0 {
|
if header.PayloadUnitStart && self.peshdr != nil && self.peshdr.DataLength == 0 {
|
||||||
if err = self.appendPayload(); err != nil {
|
if err = self.payloadEnd(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -200,6 +288,7 @@ func (self *Stream) appendPacket(header TSHeader, payload []byte) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
self.tshdr = header
|
self.tshdr = header
|
||||||
|
self.payloadStart()
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = io.CopyN(&self.buf, lr, lr.N); err != nil {
|
if _, err = io.CopyN(&self.buf, lr, lr.N); err != nil {
|
||||||
@ -207,7 +296,7 @@ func (self *Stream) appendPacket(header TSHeader, payload []byte) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if self.buf.Len() == int(self.peshdr.DataLength) {
|
if self.buf.Len() == int(self.peshdr.DataLength) {
|
||||||
if err = self.appendPayload(); err != nil {
|
if err = self.payloadEnd(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user