flv/rtmp: combine probe and packet <-> tag logic in flv; improve flvio

This commit is contained in:
nareix 2016-07-13 09:03:46 +08:00
parent 6f32e25e03
commit e1f6aa15fe
3 changed files with 401 additions and 428 deletions

View File

@ -1,11 +1,11 @@
package flv
import (
"time"
"fmt"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/av/avutil"
"github.com/nareix/joy4/codec/h264parser"
"github.com/nareix/joy4/codec"
"github.com/nareix/joy4/codec/aacparser"
"github.com/nareix/pio"
"github.com/nareix/joy4/format/flv/flvio"
@ -13,6 +13,255 @@ import (
"bufio"
)
var MaxProbePacketCount = 20
type Prober struct {
HasAudio, HasVideo bool
GotAudio, GotVideo bool
VideoStreamIdx, AudioStreamIdx int
PushedCount int
Streams []av.CodecData
CachedPkts []av.Packet
}
func (self *Prober) CacheTag(_tag flvio.Tag, timestamp int32) {
pkt, _ := self.TagToPacket(_tag, timestamp)
self.CachedPkts = append(self.CachedPkts, pkt)
}
func (self *Prober) PushTag(_tag flvio.Tag, timestamp int32) (err error) {
self.PushedCount++
if self.PushedCount > MaxProbePacketCount {
err = fmt.Errorf("flv: max probe packet count reached")
return
}
switch tag := _tag.(type) {
case *flvio.Videodata:
switch tag.AVCPacketType {
case flvio.AVC_SEQHDR:
if !self.GotVideo {
var stream h264parser.CodecData
if stream, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data); err != nil {
err = fmt.Errorf("flv: h264 seqhdr invalid")
return
}
self.VideoStreamIdx = len(self.Streams)
self.Streams = append(self.Streams, stream)
self.GotVideo = true
}
case flvio.AVC_NALU:
self.CacheTag(tag, timestamp)
}
case *flvio.Audiodata:
switch tag.SoundFormat {
case flvio.SOUND_AAC:
switch tag.AACPacketType {
case flvio.AAC_SEQHDR:
if !self.GotAudio {
var stream aacparser.CodecData
if stream, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(tag.Data); err != nil {
err = fmt.Errorf("flv: aac seqhdr invalid")
return
}
self.AudioStreamIdx = len(self.Streams)
self.Streams = append(self.Streams, stream)
self.GotAudio = true
}
case flvio.AAC_RAW:
self.CacheTag(tag, timestamp)
}
case flvio.SOUND_SPEEX:
if !self.GotAudio {
stream := codec.NewSpeexCodecData()
self.AudioStreamIdx = len(self.Streams)
self.Streams = append(self.Streams, stream)
self.GotAudio = true
self.CacheTag(tag, timestamp)
}
case flvio.SOUND_NELLYMOSER:
if !self.GotAudio {
stream := codec.NewNellyMoserCodecData()
self.AudioStreamIdx = len(self.Streams)
self.Streams = append(self.Streams, stream)
self.GotAudio = true
self.CacheTag(tag, timestamp)
}
}
}
return
}
func (self *Prober) Probed() (ok bool) {
if self.HasAudio || self.HasVideo {
if self.HasAudio == self.GotAudio && self.HasVideo == self.GotVideo {
return true
}
} else {
if self.PushedCount == MaxProbePacketCount {
return true
}
}
return
}
func (self *Prober) TagToPacket(_tag flvio.Tag, timestamp int32) (pkt av.Packet, ok bool) {
switch tag := _tag.(type) {
case *flvio.Videodata:
pkt.Idx = int8(self.VideoStreamIdx)
switch tag.AVCPacketType {
case flvio.AVC_NALU:
ok = true
pkt.Data = tag.Data
pkt.CompositionTime = flvio.TsToTime(tag.CompositionTime)
pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY
}
case *flvio.Audiodata:
pkt.Idx = int8(self.AudioStreamIdx)
switch tag.SoundFormat {
case flvio.SOUND_AAC:
switch tag.AACPacketType {
case flvio.AAC_RAW:
ok = true
pkt.Data = tag.Data
}
case flvio.SOUND_SPEEX:
ok = true
pkt.Data = tag.Data
case flvio.SOUND_NELLYMOSER:
ok = true
pkt.Data = tag.Data
}
}
pkt.Time = flvio.TsToTime(timestamp)
return
}
func (self *Prober) Empty() bool {
return len(self.CachedPkts) == 0
}
func (self *Prober) PopPacket() av.Packet {
pkt := self.CachedPkts[0]
self.CachedPkts = self.CachedPkts[1:]
return pkt
}
func CodecDataToTag(stream av.CodecData) (_tag flvio.Tag, ok bool, err error) {
switch stream.Type() {
case av.H264:
h264 := stream.(h264parser.CodecData)
tag := &flvio.Videodata{
AVCPacketType: flvio.AVC_SEQHDR,
CodecID: flvio.VIDEO_H264,
Data: h264.AVCDecoderConfRecordBytes(),
FrameType: flvio.FRAME_KEY,
}
ok = true
_tag = tag
case av.NELLYMOSER:
case av.SPEEX:
case av.AAC:
aac := stream.(aacparser.CodecData)
tag := &flvio.Audiodata{
SoundFormat: flvio.SOUND_AAC,
SoundRate: flvio.SOUND_44Khz,
AACPacketType: flvio.AAC_SEQHDR,
Data: aac.MPEG4AudioConfigBytes(),
}
switch aac.SampleFormat().BytesPerSample() {
case 1:
tag.SoundSize = flvio.SOUND_8BIT
default:
tag.SoundSize = flvio.SOUND_16BIT
}
switch aac.ChannelLayout().Count() {
case 1:
tag.SoundType = flvio.SOUND_MONO
case 2:
tag.SoundType = flvio.SOUND_STEREO
}
ok = true
_tag = tag
default:
err = fmt.Errorf("flv: unspported codecType=%v", stream.Type())
return
}
return
}
func PacketToTag(pkt av.Packet, stream av.CodecData) (_tag flvio.Tag, timestamp int32) {
switch stream.Type() {
case av.H264:
tag := &flvio.Videodata{
AVCPacketType: flvio.AVC_NALU,
CodecID: flvio.VIDEO_H264,
Data: pkt.Data,
CompositionTime: flvio.TimeToTs(pkt.CompositionTime),
}
if pkt.IsKeyFrame {
tag.FrameType = flvio.FRAME_KEY
} else {
tag.FrameType = flvio.FRAME_INTER
}
_tag = tag
case av.AAC:
tag := &flvio.Audiodata{
SoundFormat: flvio.SOUND_AAC,
SoundRate: flvio.SOUND_44Khz,
AACPacketType: flvio.AAC_RAW,
Data: pkt.Data,
}
astream := stream.(av.AudioCodecData)
switch astream.SampleFormat().BytesPerSample() {
case 1:
tag.SoundSize = flvio.SOUND_8BIT
default:
tag.SoundSize = flvio.SOUND_16BIT
}
switch astream.ChannelLayout().Count() {
case 1:
tag.SoundType = flvio.SOUND_MONO
case 2:
tag.SoundType = flvio.SOUND_STEREO
}
_tag = tag
case av.SPEEX:
tag := &flvio.Audiodata{
SoundFormat: flvio.SOUND_SPEEX,
Data: pkt.Data,
}
_tag = tag
case av.NELLYMOSER:
tag := &flvio.Audiodata{
SoundFormat: flvio.SOUND_NELLYMOSER,
Data: pkt.Data,
}
_tag = tag
}
timestamp = flvio.TimeToTs(pkt.Time)
return
}
type Muxer struct {
pw *pio.Writer
bw *bufio.Writer
@ -27,7 +276,7 @@ func NewMuxer(w io.Writer) *Muxer {
}
func (self *Muxer) SupportedCodecTypes() []av.CodecType {
return []av.CodecType{av.H264, av.AAC, av.NELLYMOSER}
return []av.CodecType{av.H264, av.AAC, av.NELLYMOSER, av.SPEEX}
}
func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) {
@ -45,33 +294,15 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) {
}
for _, stream := range streams {
var _tag flvio.Tag
switch stream.Type() {
case av.H264:
h264 := stream.(h264parser.CodecData)
tag := &flvio.Videodata{
AVCPacketType: flvio.AVC_SEQHDR,
CodecID: flvio.VIDEO_H264,
Data: h264.AVCDecoderConfRecordBytes(),
}
_tag = tag
case av.NELLYMOSER:
case av.AAC:
aac := stream.(aacparser.CodecData)
tag := flvio.MakeAACAudiodata(aac, aac.MPEG4AudioConfigBytes())
tag.AACPacketType = flvio.AAC_SEQHDR
_tag = &tag
default:
err = fmt.Errorf("flv: unspported codecType=%v", stream.Type())
var tag flvio.Tag
var ok bool
if tag, ok, err = CodecDataToTag(stream); err != nil {
return
}
if err = flvio.WriteTag(self.pw, _tag, 0); err != nil {
return
if ok {
if err = flvio.WriteTag(self.pw, tag, 0); err != nil {
return
}
}
}
@ -81,36 +312,9 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) {
func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
stream := self.streams[pkt.Idx]
var _tag flvio.Tag
tag, ts := PacketToTag(pkt, stream)
switch stream.Type() {
case av.H264:
tag := &flvio.Videodata{
AVCPacketType: flvio.AVC_NALU,
CodecID: flvio.VIDEO_H264,
Data: pkt.Data,
CompositionTime: timeToTs(pkt.CompositionTime),
}
if pkt.IsKeyFrame {
tag.FrameType = flvio.FRAME_KEY
} else {
tag.FrameType = flvio.FRAME_INTER
}
_tag = tag
case av.AAC:
tag := flvio.MakeAACAudiodata(stream.(av.AudioCodecData), pkt.Data)
_tag = &tag
case av.NELLYMOSER:
tag := &flvio.Audiodata{
SoundFormat: flvio.SOUND_NELLYMOSER,
Data: pkt.Data,
}
_tag = tag
}
if err = flvio.WriteTag(self.pw, _tag, timeToTs(pkt.Time)); err != nil {
if err = flvio.WriteTag(self.pw, tag, ts); err != nil {
return
}
@ -124,167 +328,82 @@ func (self *Muxer) WriteTrailer() (err error) {
return
}
type flvStream struct {
av.CodecData
lastts int32
tm time.Duration
}
type Demuxer struct {
streams []*flvStream
videostreamidx int
audiostreamidx int
prober *Prober
pr *pio.Reader
probepkts []flvio.Tag
probed bool
stage int
}
func NewDemuxer(r io.Reader) *Demuxer {
return &Demuxer{
pr: pio.NewReader(bufio.NewReaderSize(r, pio.RecommendBufioSize)),
prober: &Prober{},
}
}
func (self *Demuxer) probe() (err error) {
if self.probed {
return
}
var flags, got uint8
if flags, err = flvio.ReadFileHeader(self.pr); err != nil {
return
}
flags &= flvio.FILE_HAS_AUDIO|flvio.FILE_HAS_VIDEO
for {
var _tag flvio.Tag
if _tag, _, err = flvio.ReadTag(self.pr); err != nil {
return
}
switch tag := _tag.(type) {
case *flvio.Videodata:
switch tag.CodecID {
case flvio.VIDEO_H264:
switch tag.AVCPacketType {
case flvio.AVC_SEQHDR:
var codec h264parser.CodecData
if codec, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data); err != nil {
err = fmt.Errorf("flv: h264 seqhdr invalid")
return
}
self.videostreamidx = len(self.streams)
self.streams = append(self.streams, &flvStream{CodecData: codec})
got |= flvio.FILE_HAS_VIDEO
case flvio.AVC_NALU:
self.probepkts = append(self.probepkts, tag)
}
default:
err = fmt.Errorf("flv: unspported video CodecID=%d", tag.CodecID)
func (self *Demuxer) prepare() (err error) {
for self.stage < 2 {
switch self.stage {
case 0:
var flags uint8
if flags, err = flvio.ReadFileHeader(self.pr); err != nil {
return
}
case *flvio.Audiodata:
switch tag.SoundFormat {
case flvio.SOUND_AAC:
switch tag.AACPacketType {
case flvio.AAC_SEQHDR:
var codec aacparser.CodecData
if codec, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(tag.Data); err != nil {
err = fmt.Errorf("flv: aac seqhdr invalid")
return
}
self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, &flvStream{CodecData: codec})
got |= flvio.FILE_HAS_AUDIO
case flvio.AAC_RAW:
self.probepkts = append(self.probepkts, tag)
}
default:
err = fmt.Errorf("flv: unspported audio SoundFormat=%d", tag.SoundFormat)
return
if flags & flvio.FILE_HAS_AUDIO != 0 {
self.prober.HasAudio = true
}
}
if flags & flvio.FILE_HAS_VIDEO != 0 {
self.prober.HasVideo = true
}
self.stage++
if got == flags {
break
case 1:
for !self.prober.Probed() {
var tag flvio.Tag
var timestamp int32
if tag, _, err = flvio.ReadTag(self.pr); err != nil {
return
}
if err = self.prober.PushTag(tag, timestamp); err != nil {
return
}
}
self.stage++
}
}
self.probed = true
return
}
func (self *Demuxer) Streams() (streams []av.CodecData, err error) {
if err = self.probe(); err != nil {
if err = self.prepare(); err != nil {
return
}
for _, stream := range self.streams {
streams = append(streams, stream.CodecData)
}
streams = self.prober.Streams
return
}
func tsToTime(ts int32) time.Duration {
return time.Millisecond*time.Duration(ts)
}
func timeToTs(tm time.Duration) int32 {
return int32(tm / time.Millisecond)
}
func (self *Demuxer) ReadPacket() (pkt av.Packet, err error) {
if err = self.probe(); err != nil {
if err = self.prepare(); err != nil {
return
}
var timestamp int32
var stream *flvStream
loop: for {
var _tag flvio.Tag
if len(self.probepkts) > 0 {
_tag = self.probepkts[0]
self.probepkts = self.probepkts[1:]
} else {
if _tag, timestamp, err = flvio.ReadTag(self.pr); err != nil {
return
}
}
switch tag := _tag.(type) {
case *flvio.Videodata:
if tag.AVCPacketType == flvio.AVC_NALU {
stream = self.streams[self.videostreamidx]
pkt.Idx = int8(self.videostreamidx)
pkt.CompositionTime = tsToTime(tag.CompositionTime)
pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY
pkt.Data = tag.Data
break loop
}
case *flvio.Audiodata:
if tag.AACPacketType == flvio.AAC_RAW {
stream = self.streams[self.audiostreamidx]
pkt.Idx = int8(self.audiostreamidx)
pkt.Data = tag.Data
break loop
}
}
if !self.prober.Empty() {
pkt = self.prober.PopPacket()
return
}
if stream.lastts == 0 {
stream.tm = tsToTime(timestamp)
} else {
diff := timestamp - stream.lastts
stream.tm += tsToTime(diff)
for {
var tag flvio.Tag
var timestamp int32
if tag, timestamp, err = flvio.ReadTag(self.pr); err != nil {
return
}
var ok bool
if pkt, ok = self.prober.TagToPacket(tag, timestamp); ok {
return
}
}
stream.lastts = timestamp
pkt.Time = stream.tm
return
}

View File

@ -1,12 +1,20 @@
package flvio
import (
"io"
"time"
"fmt"
"github.com/nareix/pio"
"github.com/nareix/joy4/av"
"io"
)
func TsToTime(ts int32) time.Duration {
return time.Millisecond*time.Duration(ts)
}
func TimeToTs(tm time.Duration) int32 {
return int32(tm / time.Millisecond)
}
const (
TAG_AUDIO = 8
TAG_VIDEO = 9
@ -40,8 +48,7 @@ func (self Scriptdata) Len() int {
}
func (self *Scriptdata) Unmarshal(r *pio.Reader) (err error) {
self.Data = make([]byte, r.N)
if _, err = io.ReadFull(r, self.Data); err != nil {
if self.Data, err = r.ReadBytes(int(r.N)); err != nil {
return
}
return
@ -72,28 +79,6 @@ const (
AAC_RAW = 1
)
func MakeAACAudiodata(codec av.AudioCodecData, data []byte) Audiodata {
tag := Audiodata{
SoundFormat: SOUND_AAC,
SoundRate: SOUND_44Khz,
AACPacketType: AAC_RAW,
Data: data,
}
switch codec.SampleFormat().BytesPerSample() {
case 1:
tag.SoundSize = SOUND_8BIT
default:
tag.SoundSize = SOUND_16BIT
}
switch codec.ChannelLayout().Count() {
case 1:
tag.SoundType = SOUND_MONO
case 2:
tag.SoundType = SOUND_STEREO
}
return tag
}
type Audiodata struct {
/*
SoundFormat: UB[4]
@ -160,7 +145,11 @@ func (self Audiodata) Type() uint8 {
}
func (self Audiodata) Len() int {
return 2 + len(self.Data)
if self.SoundFormat == SOUND_AAC {
return 2 + len(self.Data)
} else {
return 1 + len(self.Data)
}
}
func (self Audiodata) Marshal(w *pio.Writer) (err error) {
@ -205,14 +194,12 @@ func (self *Audiodata) Unmarshal(r *pio.Reader) (err error) {
if self.AACPacketType, err = r.ReadU8(); err != nil {
return
}
self.Data = make([]byte, r.N)
if _, err = io.ReadFull(r, self.Data); err != nil {
if self.Data, err = r.ReadBytes(int(r.N)); err != nil {
return
}
default:
self.Data = make([]byte, r.N)
if _, err = io.ReadFull(r, self.Data); err != nil {
if self.Data, err = r.ReadBytes(int(r.N)); err != nil {
return
}
}
@ -287,8 +274,7 @@ func (self *Videodata) Unmarshal(r *pio.Reader) (err error) {
}
switch self.AVCPacketType {
case AVC_SEQHDR, AVC_NALU:
self.Data = make([]byte, r.N)
if _, err = io.ReadFull(r, self.Data); err != nil {
if self.Data, err = r.ReadBytes(int(r.N)); err != nil {
return
}
}
@ -402,11 +388,16 @@ func ReadTag(r *pio.Reader) (tag Tag, timestamp int32, err error) {
return
}
r.LimitOn(int64(datasize))
if err = tag.Unmarshal(r); err != nil {
b := make([]byte, datasize)
if _, err = io.ReadFull(r, b); err != nil {
return
}
r.LimitOff()
br := pio.NewReaderBytes(b)
br.LimitOn(int64(datasize))
if err = tag.Unmarshal(br); err != nil {
return
}
br.LimitOff()
if _, err = r.ReadI32BE(); err != nil {
return

View File

@ -12,19 +12,15 @@ import (
"encoding/hex"
"io"
"github.com/nareix/pio"
"github.com/nareix/joy4/format/flv"
"github.com/nareix/joy4/format/flv/flvio"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/av/avutil"
"github.com/nareix/joy4/codec"
"github.com/nareix/joy4/codec/h264parser"
"github.com/nareix/joy4/codec/aacparser"
"crypto/hmac"
"crypto/sha256"
"crypto/rand"
)
var MaxProbePacketCount = 6
func ParseURL(uri string) (u *url.URL, err error) {
if u, err = url.Parse(uri); err != nil {
return
@ -127,17 +123,6 @@ func (self *Server) ListenAndServe() (err error) {
}
}
type stream struct {
codec av.CodecData
lasttime time.Duration
}
func newStream(codec av.CodecData) *stream {
return &stream{
codec: codec,
}
}
const (
stageHandshakeDone = iota+1
stageCommandDone
@ -152,10 +137,8 @@ const (
type Conn struct {
URL *url.URL
streams []*stream
videostreamidx, audiostreamidx int
probepkts []flvio.Tag
prober *flv.Prober
streams []av.CodecData
br *pio.Reader
bw *pio.Writer
@ -195,6 +178,7 @@ type Conn struct {
func NewConn(netconn net.Conn) *Conn {
conn := &Conn{}
conn.prober = &flv.Prober{}
conn.netconn = netconn
conn.readcsmap = make(map[uint32]*chunkStream)
conn.writecsmap = make(map[uint32]*chunkStream)
@ -205,8 +189,6 @@ func NewConn(netconn net.Conn) *Conn {
conn.br = pio.NewReader(conn.bufr)
conn.bw = pio.NewWriter(conn.bufw)
conn.intw = pio.NewWriter(nil)
conn.videostreamidx = -1
conn.audiostreamidx = -1
return conn
}
@ -261,6 +243,22 @@ func (self *Conn) pollCommand() (err error) {
}
}
func (self *Conn) pollAVTag() (tag flvio.Tag, err error) {
for {
if err = self.pollMsg(); err != nil {
return
}
switch self.msgtypeid {
case msgtypeidVideoMsg:
tag = self.videodata
return
case msgtypeidAudioMsg:
tag = self.audiodata
return
}
}
}
func (self *Conn) pollMsg() (err error) {
self.gotmsg = false
self.gotcommand = false
@ -526,99 +524,17 @@ func (self *Conn) checkCreateStreamResult() (ok bool, avmsgsid uint32) {
}
func (self *Conn) probe() (err error) {
for i := 0; i < MaxProbePacketCount; {
if err = self.pollMsg(); err != nil {
for !self.prober.Probed() {
var tag flvio.Tag
if tag, err = self.pollAVTag(); err != nil {
return
}
switch self.msgtypeid {
case msgtypeidVideoMsg:
i++
tag := self.videodata
switch tag.CodecID {
case flvio.VIDEO_H264:
if tag.AVCPacketType == flvio.AVC_SEQHDR && self.videostreamidx == -1 {
var h264 h264parser.CodecData
if h264, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data); err != nil {
err = fmt.Errorf("rtmp: h264 codec data invalid")
return
}
self.videostreamidx = len(self.streams)
self.streams = append(self.streams, newStream(h264))
} else {
self.probepkts = append(self.probepkts, tag)
}
default:
err = fmt.Errorf("rtmp: video CodecID=%d not supported", tag.CodecID)
return
}
case msgtypeidAudioMsg:
i++
tag := self.audiodata
switch tag.SoundFormat {
case flvio.SOUND_AAC:
if tag.AACPacketType == flvio.AAC_SEQHDR && self.audiostreamidx == -1 {
var aac aacparser.CodecData
if aac, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(tag.Data); err != nil {
err = fmt.Errorf("rtmp: aac codec data invalid")
return
}
self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, newStream(aac))
} else {
self.probepkts = append(self.probepkts, tag)
}
case flvio.SOUND_NELLYMOSER, flvio.SOUND_NELLYMOSER_16KHZ_MONO, flvio.SOUND_NELLYMOSER_8KHZ_MONO:
if self.audiostreamidx == -1 {
stream := codec.NewNellyMoserCodecData()
self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, newStream(stream))
}
self.probepkts = append(self.probepkts, tag)
case flvio.SOUND_ALAW:
if self.audiostreamidx == -1 {
stream := codec.NewPCMAlawCodecData()
self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, newStream(stream))
}
self.probepkts = append(self.probepkts, tag)
case flvio.SOUND_MULAW:
if self.audiostreamidx == -1 {
stream := codec.NewPCMMulawCodecData()
self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, newStream(stream))
}
self.probepkts = append(self.probepkts, tag)
case flvio.SOUND_SPEEX:
if self.audiostreamidx == -1 {
stream := codec.NewSpeexCodecData()
self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, newStream(stream))
}
self.probepkts = append(self.probepkts, tag)
default:
err = fmt.Errorf("rtmp: audio SoundFormat=%d not supported", tag.SoundFormat)
return
}
}
if len(self.streams) == 2 {
break
if err = self.prober.PushTag(tag, int32(self.timestamp)); err != nil {
return
}
}
if len(self.streams) == 0 {
err = fmt.Errorf("rtmp: probe failed")
return
}
self.streams = self.prober.Streams
self.stage++
return
}
@ -799,40 +715,25 @@ func (self *Conn) ReadPacket() (pkt av.Packet, err error) {
return
}
poll: for {
var _tag flvio.Tag
/*
if !self.prober.Empty() {
pkt = self.prober.PopPacket()
return
}
*/
if len(self.probepkts) > 0 {
_tag = self.probepkts[0]
self.probepkts = self.probepkts[1:]
} else {
if err = self.pollMsg(); err != nil {
return
}
switch self.msgtypeid {
case msgtypeidVideoMsg:
_tag = self.videodata
case msgtypeidAudioMsg:
_tag = self.audiodata
}
for {
var tag flvio.Tag
if tag, err = self.pollAVTag(); err != nil {
return
}
switch tag := _tag.(type) {
case *flvio.Videodata:
pkt.CompositionTime = tsToTime(uint32(tag.CompositionTime))
pkt.Data = tag.Data
pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY
pkt.Idx = int8(self.videostreamidx)
break poll
case *flvio.Audiodata:
pkt.Data = tag.Data
pkt.Idx = int8(self.audiostreamidx)
break poll
var ok bool
if pkt, ok = self.prober.TagToPacket(tag, int32(self.timestamp)); ok {
return
}
}
pkt.Time = tsToTime(self.timestamp)
return
}
@ -885,50 +786,24 @@ func (self *Conn) Streams() (streams []av.CodecData, err error) {
if err = self.prepare(stageCodecDataDone, prepareReading); err != nil {
return
}
for _, stream := range self.streams {
streams = append(streams, stream.codec)
}
streams = self.streams
return
}
func timeToTs(tm time.Duration) uint32 {
return uint32(tm / time.Millisecond)
}
func tsToTime(ts uint32) time.Duration {
return time.Duration(ts)*time.Millisecond
}
func (self *Conn) WritePacket(pkt av.Packet) (err error) {
if err = self.prepare(stageCodecDataDone, prepareWriting); err != nil {
return
}
stream := self.streams[pkt.Idx]
ts := timeToTs(pkt.Time)
tag, timestamp := flv.PacketToTag(pkt, stream)
if Debug {
fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts)
fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime)
}
codec := stream.codec
switch codec.Type() {
case av.AAC:
audiodata := self.makeAACAudiodata(codec.(av.AudioCodecData), flvio.AAC_RAW, pkt.Data)
w := self.writeAudioDataStart()
audiodata.Marshal(w)
if err = self.writeAudioDataEnd(ts); err != nil {
return
}
case av.H264:
videodata := self.makeH264Videodata(flvio.AVC_NALU, pkt.IsKeyFrame, pkt.Data)
videodata.CompositionTime = int32(timeToTs(pkt.CompositionTime))
w := self.writeVideoDataStart()
videodata.Marshal(w)
if err = self.writeVideoDataEnd(ts); err != nil {
return
}
if err = self.writeAVTag(tag, uint32(timestamp)); err != nil {
return
}
return
@ -990,26 +865,15 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) {
// > Videodata(decoder config)
// > Audiodata(decoder config)
for _, stream := range streams {
switch stream.Type() {
case av.H264:
h264 := stream.(h264parser.CodecData)
videodata := self.makeH264Videodata(flvio.AVC_SEQHDR, true, h264.AVCDecoderConfRecordBytes())
w := self.writeVideoDataStart()
videodata.Marshal(w)
if err = self.writeVideoDataEnd(0); err != nil {
var ok bool
var tag flvio.Tag
if tag, ok, err = flv.CodecDataToTag(stream); err != nil {
return
}
if ok {
if err = self.writeAVTag(tag, 0); err != nil {
return
}
self.streams = append(self.streams, newStream(stream))
case av.AAC:
aac := stream.(aacparser.CodecData)
audiodata := self.makeAACAudiodata(aac, flvio.AAC_SEQHDR, aac.MPEG4AudioConfigBytes())
w := self.writeAudioDataStart()
audiodata.Marshal(w)
if err = self.writeAudioDataEnd(0); err != nil {
return
}
self.streams = append(self.streams, newStream(stream))
}
}
@ -1017,26 +881,6 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) {
return
}
func (self *Conn) makeH264Videodata(pkttype uint8, iskeyframe bool, data []byte) flvio.Videodata {
videodata := flvio.Videodata{
CodecID: flvio.VIDEO_H264,
AVCPacketType: pkttype,
Data: data,
}
if iskeyframe {
videodata.FrameType = flvio.FRAME_KEY
} else {
videodata.FrameType = flvio.FRAME_INTER
}
return videodata
}
func (self *Conn) makeAACAudiodata(stream av.AudioCodecData, pkttype uint8, data []byte) flvio.Audiodata {
tag := flvio.MakeAACAudiodata(stream, data)
tag.AACPacketType = pkttype
return tag
}
func (self *Conn) writeSetChunkSize(size uint32) (err error) {
w := self.writeProtoCtrlMsgStart()
w.WriteU32BE(size)
@ -1086,6 +930,25 @@ func (self *Conn) writeDataMsgEnd(csid uint32, msgsid uint32) (err error) {
return self.writeChunks(csid, 0, msgtypeidDataMsgAMF0, msgsid, msgdatav)
}
func (self *Conn) writeAVTag(tag flvio.Tag, timestamp uint32) (err error) {
switch tag.(type) {
case *flvio.Audiodata:
w := self.writeAudioDataStart()
tag.Marshal(w)
if err = self.writeAudioDataEnd(timestamp); err != nil {
return
}
case *flvio.Videodata:
w := self.writeVideoDataStart()
tag.Marshal(w)
if err = self.writeVideoDataEnd(timestamp); err != nil {
return
}
}
return
}
func (self *Conn) writeVideoDataStart() *pio.Writer {
self.intw.SaveToVecOn()
return self.intw