diff --git a/format/flv/flv.go b/format/flv/flv.go index 9af5e61..51b80fa 100644 --- a/format/flv/flv.go +++ b/format/flv/flv.go @@ -1,11 +1,11 @@ package flv import ( - "time" "fmt" "github.com/nareix/joy4/av" "github.com/nareix/joy4/av/avutil" "github.com/nareix/joy4/codec/h264parser" + "github.com/nareix/joy4/codec" "github.com/nareix/joy4/codec/aacparser" "github.com/nareix/pio" "github.com/nareix/joy4/format/flv/flvio" @@ -13,6 +13,255 @@ import ( "bufio" ) +var MaxProbePacketCount = 20 + +type Prober struct { + HasAudio, HasVideo bool + GotAudio, GotVideo bool + VideoStreamIdx, AudioStreamIdx int + PushedCount int + Streams []av.CodecData + CachedPkts []av.Packet +} + +func (self *Prober) CacheTag(_tag flvio.Tag, timestamp int32) { + pkt, _ := self.TagToPacket(_tag, timestamp) + self.CachedPkts = append(self.CachedPkts, pkt) +} + +func (self *Prober) PushTag(_tag flvio.Tag, timestamp int32) (err error) { + self.PushedCount++ + + if self.PushedCount > MaxProbePacketCount { + err = fmt.Errorf("flv: max probe packet count reached") + return + } + + switch tag := _tag.(type) { + case *flvio.Videodata: + switch tag.AVCPacketType { + case flvio.AVC_SEQHDR: + if !self.GotVideo { + var stream h264parser.CodecData + if stream, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data); err != nil { + err = fmt.Errorf("flv: h264 seqhdr invalid") + return + } + self.VideoStreamIdx = len(self.Streams) + self.Streams = append(self.Streams, stream) + self.GotVideo = true + } + + case flvio.AVC_NALU: + self.CacheTag(tag, timestamp) + } + + case *flvio.Audiodata: + switch tag.SoundFormat { + case flvio.SOUND_AAC: + switch tag.AACPacketType { + case flvio.AAC_SEQHDR: + if !self.GotAudio { + var stream aacparser.CodecData + if stream, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(tag.Data); err != nil { + err = fmt.Errorf("flv: aac seqhdr invalid") + return + } + self.AudioStreamIdx = len(self.Streams) + self.Streams = append(self.Streams, stream) + self.GotAudio = true + } + + case flvio.AAC_RAW: + self.CacheTag(tag, timestamp) + } + + case flvio.SOUND_SPEEX: + if !self.GotAudio { + stream := codec.NewSpeexCodecData() + self.AudioStreamIdx = len(self.Streams) + self.Streams = append(self.Streams, stream) + self.GotAudio = true + self.CacheTag(tag, timestamp) + } + + case flvio.SOUND_NELLYMOSER: + if !self.GotAudio { + stream := codec.NewNellyMoserCodecData() + self.AudioStreamIdx = len(self.Streams) + self.Streams = append(self.Streams, stream) + self.GotAudio = true + self.CacheTag(tag, timestamp) + } + + } + } + + return +} + +func (self *Prober) Probed() (ok bool) { + if self.HasAudio || self.HasVideo { + if self.HasAudio == self.GotAudio && self.HasVideo == self.GotVideo { + return true + } + } else { + if self.PushedCount == MaxProbePacketCount { + return true + } + } + return +} + +func (self *Prober) TagToPacket(_tag flvio.Tag, timestamp int32) (pkt av.Packet, ok bool) { + switch tag := _tag.(type) { + case *flvio.Videodata: + pkt.Idx = int8(self.VideoStreamIdx) + switch tag.AVCPacketType { + case flvio.AVC_NALU: + ok = true + pkt.Data = tag.Data + pkt.CompositionTime = flvio.TsToTime(tag.CompositionTime) + pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY + } + + case *flvio.Audiodata: + pkt.Idx = int8(self.AudioStreamIdx) + switch tag.SoundFormat { + case flvio.SOUND_AAC: + switch tag.AACPacketType { + case flvio.AAC_RAW: + ok = true + pkt.Data = tag.Data + } + + case flvio.SOUND_SPEEX: + ok = true + pkt.Data = tag.Data + + case flvio.SOUND_NELLYMOSER: + ok = true + pkt.Data = tag.Data + } + } + + pkt.Time = flvio.TsToTime(timestamp) + return +} + +func (self *Prober) Empty() bool { + return len(self.CachedPkts) == 0 +} + +func (self *Prober) PopPacket() av.Packet { + pkt := self.CachedPkts[0] + self.CachedPkts = self.CachedPkts[1:] + return pkt +} + +func CodecDataToTag(stream av.CodecData) (_tag flvio.Tag, ok bool, err error) { + switch stream.Type() { + case av.H264: + h264 := stream.(h264parser.CodecData) + tag := &flvio.Videodata{ + AVCPacketType: flvio.AVC_SEQHDR, + CodecID: flvio.VIDEO_H264, + Data: h264.AVCDecoderConfRecordBytes(), + FrameType: flvio.FRAME_KEY, + } + ok = true + _tag = tag + + case av.NELLYMOSER: + case av.SPEEX: + + case av.AAC: + aac := stream.(aacparser.CodecData) + tag := &flvio.Audiodata{ + SoundFormat: flvio.SOUND_AAC, + SoundRate: flvio.SOUND_44Khz, + AACPacketType: flvio.AAC_SEQHDR, + Data: aac.MPEG4AudioConfigBytes(), + } + switch aac.SampleFormat().BytesPerSample() { + case 1: + tag.SoundSize = flvio.SOUND_8BIT + default: + tag.SoundSize = flvio.SOUND_16BIT + } + switch aac.ChannelLayout().Count() { + case 1: + tag.SoundType = flvio.SOUND_MONO + case 2: + tag.SoundType = flvio.SOUND_STEREO + } + ok = true + _tag = tag + + default: + err = fmt.Errorf("flv: unspported codecType=%v", stream.Type()) + return + } + return +} + +func PacketToTag(pkt av.Packet, stream av.CodecData) (_tag flvio.Tag, timestamp int32) { + switch stream.Type() { + case av.H264: + tag := &flvio.Videodata{ + AVCPacketType: flvio.AVC_NALU, + CodecID: flvio.VIDEO_H264, + Data: pkt.Data, + CompositionTime: flvio.TimeToTs(pkt.CompositionTime), + } + if pkt.IsKeyFrame { + tag.FrameType = flvio.FRAME_KEY + } else { + tag.FrameType = flvio.FRAME_INTER + } + _tag = tag + + case av.AAC: + tag := &flvio.Audiodata{ + SoundFormat: flvio.SOUND_AAC, + SoundRate: flvio.SOUND_44Khz, + AACPacketType: flvio.AAC_RAW, + Data: pkt.Data, + } + astream := stream.(av.AudioCodecData) + switch astream.SampleFormat().BytesPerSample() { + case 1: + tag.SoundSize = flvio.SOUND_8BIT + default: + tag.SoundSize = flvio.SOUND_16BIT + } + switch astream.ChannelLayout().Count() { + case 1: + tag.SoundType = flvio.SOUND_MONO + case 2: + tag.SoundType = flvio.SOUND_STEREO + } + _tag = tag + + case av.SPEEX: + tag := &flvio.Audiodata{ + SoundFormat: flvio.SOUND_SPEEX, + Data: pkt.Data, + } + _tag = tag + + case av.NELLYMOSER: + tag := &flvio.Audiodata{ + SoundFormat: flvio.SOUND_NELLYMOSER, + Data: pkt.Data, + } + _tag = tag + } + + timestamp = flvio.TimeToTs(pkt.Time) + return +} + type Muxer struct { pw *pio.Writer bw *bufio.Writer @@ -27,7 +276,7 @@ func NewMuxer(w io.Writer) *Muxer { } func (self *Muxer) SupportedCodecTypes() []av.CodecType { - return []av.CodecType{av.H264, av.AAC, av.NELLYMOSER} + return []av.CodecType{av.H264, av.AAC, av.NELLYMOSER, av.SPEEX} } func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) { @@ -45,33 +294,15 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) { } for _, stream := range streams { - var _tag flvio.Tag - - switch stream.Type() { - case av.H264: - h264 := stream.(h264parser.CodecData) - tag := &flvio.Videodata{ - AVCPacketType: flvio.AVC_SEQHDR, - CodecID: flvio.VIDEO_H264, - Data: h264.AVCDecoderConfRecordBytes(), - } - _tag = tag - - case av.NELLYMOSER: - - case av.AAC: - aac := stream.(aacparser.CodecData) - tag := flvio.MakeAACAudiodata(aac, aac.MPEG4AudioConfigBytes()) - tag.AACPacketType = flvio.AAC_SEQHDR - _tag = &tag - - default: - err = fmt.Errorf("flv: unspported codecType=%v", stream.Type()) + var tag flvio.Tag + var ok bool + if tag, ok, err = CodecDataToTag(stream); err != nil { return } - - if err = flvio.WriteTag(self.pw, _tag, 0); err != nil { - return + if ok { + if err = flvio.WriteTag(self.pw, tag, 0); err != nil { + return + } } } @@ -81,36 +312,9 @@ func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) { func (self *Muxer) WritePacket(pkt av.Packet) (err error) { stream := self.streams[pkt.Idx] - var _tag flvio.Tag + tag, ts := PacketToTag(pkt, stream) - switch stream.Type() { - case av.H264: - tag := &flvio.Videodata{ - AVCPacketType: flvio.AVC_NALU, - CodecID: flvio.VIDEO_H264, - Data: pkt.Data, - CompositionTime: timeToTs(pkt.CompositionTime), - } - if pkt.IsKeyFrame { - tag.FrameType = flvio.FRAME_KEY - } else { - tag.FrameType = flvio.FRAME_INTER - } - _tag = tag - - case av.AAC: - tag := flvio.MakeAACAudiodata(stream.(av.AudioCodecData), pkt.Data) - _tag = &tag - - case av.NELLYMOSER: - tag := &flvio.Audiodata{ - SoundFormat: flvio.SOUND_NELLYMOSER, - Data: pkt.Data, - } - _tag = tag - } - - if err = flvio.WriteTag(self.pw, _tag, timeToTs(pkt.Time)); err != nil { + if err = flvio.WriteTag(self.pw, tag, ts); err != nil { return } @@ -124,167 +328,82 @@ func (self *Muxer) WriteTrailer() (err error) { return } -type flvStream struct { - av.CodecData - lastts int32 - tm time.Duration -} - type Demuxer struct { - streams []*flvStream - videostreamidx int - audiostreamidx int + prober *Prober pr *pio.Reader - probepkts []flvio.Tag - probed bool + stage int } func NewDemuxer(r io.Reader) *Demuxer { return &Demuxer{ pr: pio.NewReader(bufio.NewReaderSize(r, pio.RecommendBufioSize)), + prober: &Prober{}, } } -func (self *Demuxer) probe() (err error) { - if self.probed { - return - } - - var flags, got uint8 - if flags, err = flvio.ReadFileHeader(self.pr); err != nil { - return - } - flags &= flvio.FILE_HAS_AUDIO|flvio.FILE_HAS_VIDEO - - for { - var _tag flvio.Tag - if _tag, _, err = flvio.ReadTag(self.pr); err != nil { - return - } - - switch tag := _tag.(type) { - case *flvio.Videodata: - switch tag.CodecID { - case flvio.VIDEO_H264: - switch tag.AVCPacketType { - case flvio.AVC_SEQHDR: - var codec h264parser.CodecData - if codec, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data); err != nil { - err = fmt.Errorf("flv: h264 seqhdr invalid") - return - } - self.videostreamidx = len(self.streams) - self.streams = append(self.streams, &flvStream{CodecData: codec}) - got |= flvio.FILE_HAS_VIDEO - - case flvio.AVC_NALU: - self.probepkts = append(self.probepkts, tag) - } - - default: - err = fmt.Errorf("flv: unspported video CodecID=%d", tag.CodecID) +func (self *Demuxer) prepare() (err error) { + for self.stage < 2 { + switch self.stage { + case 0: + var flags uint8 + if flags, err = flvio.ReadFileHeader(self.pr); err != nil { return } - - case *flvio.Audiodata: - switch tag.SoundFormat { - case flvio.SOUND_AAC: - switch tag.AACPacketType { - case flvio.AAC_SEQHDR: - var codec aacparser.CodecData - if codec, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(tag.Data); err != nil { - err = fmt.Errorf("flv: aac seqhdr invalid") - return - } - self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, &flvStream{CodecData: codec}) - got |= flvio.FILE_HAS_AUDIO - - case flvio.AAC_RAW: - self.probepkts = append(self.probepkts, tag) - } - - default: - err = fmt.Errorf("flv: unspported audio SoundFormat=%d", tag.SoundFormat) - return + if flags & flvio.FILE_HAS_AUDIO != 0 { + self.prober.HasAudio = true } - } + if flags & flvio.FILE_HAS_VIDEO != 0 { + self.prober.HasVideo = true + } + self.stage++ - if got == flags { - break + case 1: + for !self.prober.Probed() { + var tag flvio.Tag + var timestamp int32 + if tag, _, err = flvio.ReadTag(self.pr); err != nil { + return + } + if err = self.prober.PushTag(tag, timestamp); err != nil { + return + } + } + self.stage++ } } - - self.probed = true return } func (self *Demuxer) Streams() (streams []av.CodecData, err error) { - if err = self.probe(); err != nil { + if err = self.prepare(); err != nil { return } - for _, stream := range self.streams { - streams = append(streams, stream.CodecData) - } + streams = self.prober.Streams return } -func tsToTime(ts int32) time.Duration { - return time.Millisecond*time.Duration(ts) -} - -func timeToTs(tm time.Duration) int32 { - return int32(tm / time.Millisecond) -} - func (self *Demuxer) ReadPacket() (pkt av.Packet, err error) { - if err = self.probe(); err != nil { + if err = self.prepare(); err != nil { return } - var timestamp int32 - var stream *flvStream - - loop: for { - var _tag flvio.Tag - if len(self.probepkts) > 0 { - _tag = self.probepkts[0] - self.probepkts = self.probepkts[1:] - } else { - if _tag, timestamp, err = flvio.ReadTag(self.pr); err != nil { - return - } - } - - switch tag := _tag.(type) { - case *flvio.Videodata: - if tag.AVCPacketType == flvio.AVC_NALU { - stream = self.streams[self.videostreamidx] - pkt.Idx = int8(self.videostreamidx) - pkt.CompositionTime = tsToTime(tag.CompositionTime) - pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY - pkt.Data = tag.Data - break loop - } - - case *flvio.Audiodata: - if tag.AACPacketType == flvio.AAC_RAW { - stream = self.streams[self.audiostreamidx] - pkt.Idx = int8(self.audiostreamidx) - pkt.Data = tag.Data - break loop - } - } + if !self.prober.Empty() { + pkt = self.prober.PopPacket() + return } - if stream.lastts == 0 { - stream.tm = tsToTime(timestamp) - } else { - diff := timestamp - stream.lastts - stream.tm += tsToTime(diff) + for { + var tag flvio.Tag + var timestamp int32 + if tag, timestamp, err = flvio.ReadTag(self.pr); err != nil { + return + } + + var ok bool + if pkt, ok = self.prober.TagToPacket(tag, timestamp); ok { + return + } } - stream.lastts = timestamp - pkt.Time = stream.tm return } diff --git a/format/flv/flvio/flvio.go b/format/flv/flvio/flvio.go index de0ee5b..b53362f 100644 --- a/format/flv/flvio/flvio.go +++ b/format/flv/flvio/flvio.go @@ -1,12 +1,20 @@ package flvio import ( + "io" + "time" "fmt" "github.com/nareix/pio" - "github.com/nareix/joy4/av" - "io" ) +func TsToTime(ts int32) time.Duration { + return time.Millisecond*time.Duration(ts) +} + +func TimeToTs(tm time.Duration) int32 { + return int32(tm / time.Millisecond) +} + const ( TAG_AUDIO = 8 TAG_VIDEO = 9 @@ -40,8 +48,7 @@ func (self Scriptdata) Len() int { } func (self *Scriptdata) Unmarshal(r *pio.Reader) (err error) { - self.Data = make([]byte, r.N) - if _, err = io.ReadFull(r, self.Data); err != nil { + if self.Data, err = r.ReadBytes(int(r.N)); err != nil { return } return @@ -72,28 +79,6 @@ const ( AAC_RAW = 1 ) -func MakeAACAudiodata(codec av.AudioCodecData, data []byte) Audiodata { - tag := Audiodata{ - SoundFormat: SOUND_AAC, - SoundRate: SOUND_44Khz, - AACPacketType: AAC_RAW, - Data: data, - } - switch codec.SampleFormat().BytesPerSample() { - case 1: - tag.SoundSize = SOUND_8BIT - default: - tag.SoundSize = SOUND_16BIT - } - switch codec.ChannelLayout().Count() { - case 1: - tag.SoundType = SOUND_MONO - case 2: - tag.SoundType = SOUND_STEREO - } - return tag -} - type Audiodata struct { /* SoundFormat: UB[4] @@ -160,7 +145,11 @@ func (self Audiodata) Type() uint8 { } func (self Audiodata) Len() int { - return 2 + len(self.Data) + if self.SoundFormat == SOUND_AAC { + return 2 + len(self.Data) + } else { + return 1 + len(self.Data) + } } func (self Audiodata) Marshal(w *pio.Writer) (err error) { @@ -205,14 +194,12 @@ func (self *Audiodata) Unmarshal(r *pio.Reader) (err error) { if self.AACPacketType, err = r.ReadU8(); err != nil { return } - self.Data = make([]byte, r.N) - if _, err = io.ReadFull(r, self.Data); err != nil { + if self.Data, err = r.ReadBytes(int(r.N)); err != nil { return } default: - self.Data = make([]byte, r.N) - if _, err = io.ReadFull(r, self.Data); err != nil { + if self.Data, err = r.ReadBytes(int(r.N)); err != nil { return } } @@ -287,8 +274,7 @@ func (self *Videodata) Unmarshal(r *pio.Reader) (err error) { } switch self.AVCPacketType { case AVC_SEQHDR, AVC_NALU: - self.Data = make([]byte, r.N) - if _, err = io.ReadFull(r, self.Data); err != nil { + if self.Data, err = r.ReadBytes(int(r.N)); err != nil { return } } @@ -402,11 +388,16 @@ func ReadTag(r *pio.Reader) (tag Tag, timestamp int32, err error) { return } - r.LimitOn(int64(datasize)) - if err = tag.Unmarshal(r); err != nil { + b := make([]byte, datasize) + if _, err = io.ReadFull(r, b); err != nil { return } - r.LimitOff() + br := pio.NewReaderBytes(b) + br.LimitOn(int64(datasize)) + if err = tag.Unmarshal(br); err != nil { + return + } + br.LimitOff() if _, err = r.ReadI32BE(); err != nil { return diff --git a/format/rtmp/rtmp.go b/format/rtmp/rtmp.go index 3f66662..7754d63 100644 --- a/format/rtmp/rtmp.go +++ b/format/rtmp/rtmp.go @@ -12,19 +12,15 @@ import ( "encoding/hex" "io" "github.com/nareix/pio" + "github.com/nareix/joy4/format/flv" "github.com/nareix/joy4/format/flv/flvio" "github.com/nareix/joy4/av" "github.com/nareix/joy4/av/avutil" - "github.com/nareix/joy4/codec" - "github.com/nareix/joy4/codec/h264parser" - "github.com/nareix/joy4/codec/aacparser" "crypto/hmac" "crypto/sha256" "crypto/rand" ) -var MaxProbePacketCount = 6 - func ParseURL(uri string) (u *url.URL, err error) { if u, err = url.Parse(uri); err != nil { return @@ -127,17 +123,6 @@ func (self *Server) ListenAndServe() (err error) { } } -type stream struct { - codec av.CodecData - lasttime time.Duration -} - -func newStream(codec av.CodecData) *stream { - return &stream{ - codec: codec, - } -} - const ( stageHandshakeDone = iota+1 stageCommandDone @@ -152,10 +137,8 @@ const ( type Conn struct { URL *url.URL - streams []*stream - videostreamidx, audiostreamidx int - - probepkts []flvio.Tag + prober *flv.Prober + streams []av.CodecData br *pio.Reader bw *pio.Writer @@ -195,6 +178,7 @@ type Conn struct { func NewConn(netconn net.Conn) *Conn { conn := &Conn{} + conn.prober = &flv.Prober{} conn.netconn = netconn conn.readcsmap = make(map[uint32]*chunkStream) conn.writecsmap = make(map[uint32]*chunkStream) @@ -205,8 +189,6 @@ func NewConn(netconn net.Conn) *Conn { conn.br = pio.NewReader(conn.bufr) conn.bw = pio.NewWriter(conn.bufw) conn.intw = pio.NewWriter(nil) - conn.videostreamidx = -1 - conn.audiostreamidx = -1 return conn } @@ -261,6 +243,22 @@ func (self *Conn) pollCommand() (err error) { } } +func (self *Conn) pollAVTag() (tag flvio.Tag, err error) { + for { + if err = self.pollMsg(); err != nil { + return + } + switch self.msgtypeid { + case msgtypeidVideoMsg: + tag = self.videodata + return + case msgtypeidAudioMsg: + tag = self.audiodata + return + } + } +} + func (self *Conn) pollMsg() (err error) { self.gotmsg = false self.gotcommand = false @@ -526,99 +524,17 @@ func (self *Conn) checkCreateStreamResult() (ok bool, avmsgsid uint32) { } func (self *Conn) probe() (err error) { - for i := 0; i < MaxProbePacketCount; { - if err = self.pollMsg(); err != nil { + for !self.prober.Probed() { + var tag flvio.Tag + if tag, err = self.pollAVTag(); err != nil { return } - - switch self.msgtypeid { - case msgtypeidVideoMsg: - i++ - tag := self.videodata - switch tag.CodecID { - case flvio.VIDEO_H264: - if tag.AVCPacketType == flvio.AVC_SEQHDR && self.videostreamidx == -1 { - var h264 h264parser.CodecData - if h264, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data); err != nil { - err = fmt.Errorf("rtmp: h264 codec data invalid") - return - } - self.videostreamidx = len(self.streams) - self.streams = append(self.streams, newStream(h264)) - } else { - self.probepkts = append(self.probepkts, tag) - } - - default: - err = fmt.Errorf("rtmp: video CodecID=%d not supported", tag.CodecID) - return - } - - case msgtypeidAudioMsg: - i++ - tag := self.audiodata - switch tag.SoundFormat { - case flvio.SOUND_AAC: - if tag.AACPacketType == flvio.AAC_SEQHDR && self.audiostreamidx == -1 { - var aac aacparser.CodecData - if aac, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(tag.Data); err != nil { - err = fmt.Errorf("rtmp: aac codec data invalid") - return - } - self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, newStream(aac)) - } else { - self.probepkts = append(self.probepkts, tag) - } - - case flvio.SOUND_NELLYMOSER, flvio.SOUND_NELLYMOSER_16KHZ_MONO, flvio.SOUND_NELLYMOSER_8KHZ_MONO: - if self.audiostreamidx == -1 { - stream := codec.NewNellyMoserCodecData() - self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, newStream(stream)) - } - self.probepkts = append(self.probepkts, tag) - - case flvio.SOUND_ALAW: - if self.audiostreamidx == -1 { - stream := codec.NewPCMAlawCodecData() - self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, newStream(stream)) - } - self.probepkts = append(self.probepkts, tag) - - case flvio.SOUND_MULAW: - if self.audiostreamidx == -1 { - stream := codec.NewPCMMulawCodecData() - self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, newStream(stream)) - } - self.probepkts = append(self.probepkts, tag) - - case flvio.SOUND_SPEEX: - if self.audiostreamidx == -1 { - stream := codec.NewSpeexCodecData() - self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, newStream(stream)) - } - self.probepkts = append(self.probepkts, tag) - - default: - err = fmt.Errorf("rtmp: audio SoundFormat=%d not supported", tag.SoundFormat) - return - } - } - - if len(self.streams) == 2 { - break + if err = self.prober.PushTag(tag, int32(self.timestamp)); err != nil { + return } } - if len(self.streams) == 0 { - err = fmt.Errorf("rtmp: probe failed") - return - } - + self.streams = self.prober.Streams self.stage++ return } @@ -799,40 +715,25 @@ func (self *Conn) ReadPacket() (pkt av.Packet, err error) { return } - poll: for { - var _tag flvio.Tag + /* + if !self.prober.Empty() { + pkt = self.prober.PopPacket() + return + } + */ - if len(self.probepkts) > 0 { - _tag = self.probepkts[0] - self.probepkts = self.probepkts[1:] - } else { - if err = self.pollMsg(); err != nil { - return - } - switch self.msgtypeid { - case msgtypeidVideoMsg: - _tag = self.videodata - case msgtypeidAudioMsg: - _tag = self.audiodata - } + for { + var tag flvio.Tag + if tag, err = self.pollAVTag(); err != nil { + return } - switch tag := _tag.(type) { - case *flvio.Videodata: - pkt.CompositionTime = tsToTime(uint32(tag.CompositionTime)) - pkt.Data = tag.Data - pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY - pkt.Idx = int8(self.videostreamidx) - break poll - - case *flvio.Audiodata: - pkt.Data = tag.Data - pkt.Idx = int8(self.audiostreamidx) - break poll + var ok bool + if pkt, ok = self.prober.TagToPacket(tag, int32(self.timestamp)); ok { + return } } - pkt.Time = tsToTime(self.timestamp) return } @@ -885,50 +786,24 @@ func (self *Conn) Streams() (streams []av.CodecData, err error) { if err = self.prepare(stageCodecDataDone, prepareReading); err != nil { return } - for _, stream := range self.streams { - streams = append(streams, stream.codec) - } + streams = self.streams return } -func timeToTs(tm time.Duration) uint32 { - return uint32(tm / time.Millisecond) -} - -func tsToTime(ts uint32) time.Duration { - return time.Duration(ts)*time.Millisecond -} - func (self *Conn) WritePacket(pkt av.Packet) (err error) { if err = self.prepare(stageCodecDataDone, prepareWriting); err != nil { return } stream := self.streams[pkt.Idx] - ts := timeToTs(pkt.Time) + tag, timestamp := flv.PacketToTag(pkt, stream) if Debug { - fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts) + fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime) } - codec := stream.codec - switch codec.Type() { - case av.AAC: - audiodata := self.makeAACAudiodata(codec.(av.AudioCodecData), flvio.AAC_RAW, pkt.Data) - w := self.writeAudioDataStart() - audiodata.Marshal(w) - if err = self.writeAudioDataEnd(ts); err != nil { - return - } - - case av.H264: - videodata := self.makeH264Videodata(flvio.AVC_NALU, pkt.IsKeyFrame, pkt.Data) - videodata.CompositionTime = int32(timeToTs(pkt.CompositionTime)) - w := self.writeVideoDataStart() - videodata.Marshal(w) - if err = self.writeVideoDataEnd(ts); err != nil { - return - } + if err = self.writeAVTag(tag, uint32(timestamp)); err != nil { + return } return @@ -990,26 +865,15 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { // > Videodata(decoder config) // > Audiodata(decoder config) for _, stream := range streams { - switch stream.Type() { - case av.H264: - h264 := stream.(h264parser.CodecData) - videodata := self.makeH264Videodata(flvio.AVC_SEQHDR, true, h264.AVCDecoderConfRecordBytes()) - w := self.writeVideoDataStart() - videodata.Marshal(w) - if err = self.writeVideoDataEnd(0); err != nil { + var ok bool + var tag flvio.Tag + if tag, ok, err = flv.CodecDataToTag(stream); err != nil { + return + } + if ok { + if err = self.writeAVTag(tag, 0); err != nil { return } - self.streams = append(self.streams, newStream(stream)) - - case av.AAC: - aac := stream.(aacparser.CodecData) - audiodata := self.makeAACAudiodata(aac, flvio.AAC_SEQHDR, aac.MPEG4AudioConfigBytes()) - w := self.writeAudioDataStart() - audiodata.Marshal(w) - if err = self.writeAudioDataEnd(0); err != nil { - return - } - self.streams = append(self.streams, newStream(stream)) } } @@ -1017,26 +881,6 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { return } -func (self *Conn) makeH264Videodata(pkttype uint8, iskeyframe bool, data []byte) flvio.Videodata { - videodata := flvio.Videodata{ - CodecID: flvio.VIDEO_H264, - AVCPacketType: pkttype, - Data: data, - } - if iskeyframe { - videodata.FrameType = flvio.FRAME_KEY - } else { - videodata.FrameType = flvio.FRAME_INTER - } - return videodata -} - -func (self *Conn) makeAACAudiodata(stream av.AudioCodecData, pkttype uint8, data []byte) flvio.Audiodata { - tag := flvio.MakeAACAudiodata(stream, data) - tag.AACPacketType = pkttype - return tag -} - func (self *Conn) writeSetChunkSize(size uint32) (err error) { w := self.writeProtoCtrlMsgStart() w.WriteU32BE(size) @@ -1086,6 +930,25 @@ func (self *Conn) writeDataMsgEnd(csid uint32, msgsid uint32) (err error) { return self.writeChunks(csid, 0, msgtypeidDataMsgAMF0, msgsid, msgdatav) } +func (self *Conn) writeAVTag(tag flvio.Tag, timestamp uint32) (err error) { + switch tag.(type) { + case *flvio.Audiodata: + w := self.writeAudioDataStart() + tag.Marshal(w) + if err = self.writeAudioDataEnd(timestamp); err != nil { + return + } + + case *flvio.Videodata: + w := self.writeVideoDataStart() + tag.Marshal(w) + if err = self.writeVideoDataEnd(timestamp); err != nil { + return + } + } + return +} + func (self *Conn) writeVideoDataStart() *pio.Writer { self.intw.SaveToVecOn() return self.intw