change flvio.Tag from interface to struct

This commit is contained in:
nareix 2016-09-19 07:29:56 +08:00
parent a76cfa1413
commit b3b189ca28
3 changed files with 287 additions and 333 deletions

View File

@ -1,17 +1,17 @@
package flv package flv
import ( import (
"bufio"
"fmt" "fmt"
"github.com/nareix/bits/pio"
"github.com/nareix/joy4/av" "github.com/nareix/joy4/av"
"github.com/nareix/joy4/av/avutil" "github.com/nareix/joy4/av/avutil"
"github.com/nareix/joy4/codec/h264parser"
"github.com/nareix/joy4/codec" "github.com/nareix/joy4/codec"
"github.com/nareix/joy4/codec/fake"
"github.com/nareix/joy4/codec/aacparser" "github.com/nareix/joy4/codec/aacparser"
"github.com/nareix/bits/pio" "github.com/nareix/joy4/codec/fake"
"github.com/nareix/joy4/codec/h264parser"
"github.com/nareix/joy4/format/flv/flvio" "github.com/nareix/joy4/format/flv/flvio"
"io" "io"
"bufio"
) )
var MaxProbePacketCount = 20 var MaxProbePacketCount = 20
@ -60,12 +60,12 @@ func NewMetadataByStreams(streams []av.CodecData) (metadata flvio.AMFMap, err er
} }
type Prober struct { type Prober struct {
HasAudio, HasVideo bool HasAudio, HasVideo bool
GotAudio, GotVideo bool GotAudio, GotVideo bool
VideoStreamIdx, AudioStreamIdx int VideoStreamIdx, AudioStreamIdx int
PushedCount int PushedCount int
Streams []av.CodecData Streams []av.CodecData
CachedPkts []av.Packet CachedPkts []av.Packet
} }
func (self *Prober) CacheTag(_tag flvio.Tag, timestamp int32) { func (self *Prober) CacheTag(_tag flvio.Tag, timestamp int32) {
@ -73,7 +73,7 @@ func (self *Prober) CacheTag(_tag flvio.Tag, timestamp int32) {
self.CachedPkts = append(self.CachedPkts, pkt) self.CachedPkts = append(self.CachedPkts, pkt)
} }
func (self *Prober) PushTag(_tag flvio.Tag, timestamp int32) (err error) { func (self *Prober) PushTag(tag flvio.Tag, timestamp int32) (err error) {
self.PushedCount++ self.PushedCount++
if self.PushedCount > MaxProbePacketCount { if self.PushedCount > MaxProbePacketCount {
@ -81,8 +81,8 @@ func (self *Prober) PushTag(_tag flvio.Tag, timestamp int32) (err error) {
return return
} }
switch tag := _tag.(type) { switch tag.Type {
case *flvio.Videodata: case flvio.TAG_VIDEO:
switch tag.AVCPacketType { switch tag.AVCPacketType {
case flvio.AVC_SEQHDR: case flvio.AVC_SEQHDR:
if !self.GotVideo { if !self.GotVideo {
@ -100,7 +100,7 @@ func (self *Prober) PushTag(_tag flvio.Tag, timestamp int32) (err error) {
self.CacheTag(tag, timestamp) self.CacheTag(tag, timestamp)
} }
case *flvio.Audiodata: case flvio.TAG_AUDIO:
switch tag.SoundFormat { switch tag.SoundFormat {
case flvio.SOUND_AAC: case flvio.SOUND_AAC:
switch tag.AACPacketType { switch tag.AACPacketType {
@ -132,9 +132,9 @@ func (self *Prober) PushTag(_tag flvio.Tag, timestamp int32) (err error) {
case flvio.SOUND_NELLYMOSER: case flvio.SOUND_NELLYMOSER:
if !self.GotAudio { if !self.GotAudio {
stream := fake.CodecData{ stream := fake.CodecData{
CodecType_: av.NELLYMOSER, CodecType_: av.NELLYMOSER,
SampleRate_: 16000, SampleRate_: 16000,
SampleFormat_: av.S16, SampleFormat_: av.S16,
ChannelLayout_: tag.ChannelLayout(), ChannelLayout_: tag.ChannelLayout(),
} }
self.AudioStreamIdx = len(self.Streams) self.AudioStreamIdx = len(self.Streams)
@ -162,9 +162,9 @@ func (self *Prober) Probed() (ok bool) {
return return
} }
func (self *Prober) TagToPacket(_tag flvio.Tag, timestamp int32) (pkt av.Packet, ok bool) { func (self *Prober) TagToPacket(tag flvio.Tag, timestamp int32) (pkt av.Packet, ok bool) {
switch tag := _tag.(type) { switch tag.Type {
case *flvio.Videodata: case flvio.TAG_VIDEO:
pkt.Idx = int8(self.VideoStreamIdx) pkt.Idx = int8(self.VideoStreamIdx)
switch tag.AVCPacketType { switch tag.AVCPacketType {
case flvio.AVC_NALU: case flvio.AVC_NALU:
@ -174,7 +174,7 @@ func (self *Prober) TagToPacket(_tag flvio.Tag, timestamp int32) (pkt av.Packet,
pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY
} }
case *flvio.Audiodata: case flvio.TAG_AUDIO:
pkt.Idx = int8(self.AudioStreamIdx) pkt.Idx = int8(self.AudioStreamIdx)
switch tag.SoundFormat { switch tag.SoundFormat {
case flvio.SOUND_AAC: case flvio.SOUND_AAC:
@ -212,11 +212,12 @@ func CodecDataToTag(stream av.CodecData) (_tag flvio.Tag, ok bool, err error) {
switch stream.Type() { switch stream.Type() {
case av.H264: case av.H264:
h264 := stream.(h264parser.CodecData) h264 := stream.(h264parser.CodecData)
tag := &flvio.Videodata{ tag := flvio.Tag{
Type: flvio.TAG_VIDEO,
AVCPacketType: flvio.AVC_SEQHDR, AVCPacketType: flvio.AVC_SEQHDR,
CodecID: flvio.VIDEO_H264, CodecID: flvio.VIDEO_H264,
Data: h264.AVCDecoderConfRecordBytes(), Data: h264.AVCDecoderConfRecordBytes(),
FrameType: flvio.FRAME_KEY, FrameType: flvio.FRAME_KEY,
} }
ok = true ok = true
_tag = tag _tag = tag
@ -226,11 +227,12 @@ func CodecDataToTag(stream av.CodecData) (_tag flvio.Tag, ok bool, err error) {
case av.AAC: case av.AAC:
aac := stream.(aacparser.CodecData) aac := stream.(aacparser.CodecData)
tag := &flvio.Audiodata{ tag := flvio.Tag{
SoundFormat: flvio.SOUND_AAC, Type: flvio.TAG_AUDIO,
SoundRate: flvio.SOUND_44Khz, SoundFormat: flvio.SOUND_AAC,
SoundRate: flvio.SOUND_44Khz,
AACPacketType: flvio.AAC_SEQHDR, AACPacketType: flvio.AAC_SEQHDR,
Data: aac.MPEG4AudioConfigBytes(), Data: aac.MPEG4AudioConfigBytes(),
} }
switch aac.SampleFormat().BytesPerSample() { switch aac.SampleFormat().BytesPerSample() {
case 1: case 1:
@ -254,13 +256,14 @@ func CodecDataToTag(stream av.CodecData) (_tag flvio.Tag, ok bool, err error) {
return return
} }
func PacketToTag(pkt av.Packet, stream av.CodecData) (_tag flvio.Tag, timestamp int32) { func PacketToTag(pkt av.Packet, stream av.CodecData) (tag flvio.Tag, timestamp int32) {
switch stream.Type() { switch stream.Type() {
case av.H264: case av.H264:
tag := &flvio.Videodata{ tag = flvio.Tag{
AVCPacketType: flvio.AVC_NALU, Type: flvio.TAG_VIDEO,
CodecID: flvio.VIDEO_H264, AVCPacketType: flvio.AVC_NALU,
Data: pkt.Data, CodecID: flvio.VIDEO_H264,
Data: pkt.Data,
CompositionTime: flvio.TimeToTs(pkt.CompositionTime), CompositionTime: flvio.TimeToTs(pkt.CompositionTime),
} }
if pkt.IsKeyFrame { if pkt.IsKeyFrame {
@ -268,14 +271,14 @@ func PacketToTag(pkt av.Packet, stream av.CodecData) (_tag flvio.Tag, timestamp
} else { } else {
tag.FrameType = flvio.FRAME_INTER tag.FrameType = flvio.FRAME_INTER
} }
_tag = tag
case av.AAC: case av.AAC:
tag := &flvio.Audiodata{ tag = flvio.Tag{
SoundFormat: flvio.SOUND_AAC, Type: flvio.TAG_AUDIO,
SoundRate: flvio.SOUND_44Khz, SoundFormat: flvio.SOUND_AAC,
SoundRate: flvio.SOUND_44Khz,
AACPacketType: flvio.AAC_RAW, AACPacketType: flvio.AAC_RAW,
Data: pkt.Data, Data: pkt.Data,
} }
astream := stream.(av.AudioCodecData) astream := stream.(av.AudioCodecData)
switch astream.SampleFormat().BytesPerSample() { switch astream.SampleFormat().BytesPerSample() {
@ -290,21 +293,20 @@ func PacketToTag(pkt av.Packet, stream av.CodecData) (_tag flvio.Tag, timestamp
case 2: case 2:
tag.SoundType = flvio.SOUND_STEREO tag.SoundType = flvio.SOUND_STEREO
} }
_tag = tag
case av.SPEEX: case av.SPEEX:
tag := &flvio.Audiodata{ tag = flvio.Tag{
Type: flvio.TAG_AUDIO,
SoundFormat: flvio.SOUND_SPEEX, SoundFormat: flvio.SOUND_SPEEX,
Data: pkt.Data, Data: pkt.Data,
} }
_tag = tag
case av.NELLYMOSER: case av.NELLYMOSER:
tag := &flvio.Audiodata{ tag = flvio.Tag{
Type: flvio.TAG_AUDIO,
SoundFormat: flvio.SOUND_NELLYMOSER, SoundFormat: flvio.SOUND_NELLYMOSER,
Data: pkt.Data, Data: pkt.Data,
} }
_tag = tag
} }
timestamp = flvio.TimeToTs(pkt.Time) timestamp = flvio.TimeToTs(pkt.Time)
@ -312,8 +314,8 @@ func PacketToTag(pkt av.Packet, stream av.CodecData) (_tag flvio.Tag, timestamp
} }
type Muxer struct { type Muxer struct {
bufw writeFlusher bufw writeFlusher
b []byte b []byte
streams []av.CodecData streams []av.CodecData
} }
@ -325,7 +327,7 @@ type writeFlusher interface {
func NewMuxerWriteFlusher(w writeFlusher) *Muxer { func NewMuxerWriteFlusher(w writeFlusher) *Muxer {
return &Muxer{ return &Muxer{
bufw: w, bufw: w,
b: make([]byte, 256), b: make([]byte, 256),
} }
} }
@ -386,16 +388,16 @@ func (self *Muxer) WriteTrailer() (err error) {
type Demuxer struct { type Demuxer struct {
prober *Prober prober *Prober
bufr *bufio.Reader bufr *bufio.Reader
b []byte b []byte
stage int stage int
} }
func NewDemuxer(r io.Reader) *Demuxer { func NewDemuxer(r io.Reader) *Demuxer {
return &Demuxer{ return &Demuxer{
bufr: bufio.NewReaderSize(r, pio.RecommendBufioSize), bufr: bufio.NewReaderSize(r, pio.RecommendBufioSize),
prober: &Prober{}, prober: &Prober{},
b: make([]byte, 256), b: make([]byte, 256),
} }
} }
@ -414,10 +416,10 @@ func (self *Demuxer) prepare() (err error) {
if _, err = self.bufr.Discard(skip); err != nil { if _, err = self.bufr.Discard(skip); err != nil {
return return
} }
if flags & flvio.FILE_HAS_AUDIO != 0 { if flags&flvio.FILE_HAS_AUDIO != 0 {
self.prober.HasAudio = true self.prober.HasAudio = true
} }
if flags & flvio.FILE_HAS_VIDEO != 0 { if flags&flvio.FILE_HAS_VIDEO != 0 {
self.prober.HasVideo = true self.prober.HasVideo = true
} }
self.stage++ self.stage++
@ -490,4 +492,3 @@ func Handler(h *avutil.RegisterHandler) {
h.CodecTypes = CodecTypes h.CodecTypes = CodecTypes
} }

View File

@ -1,70 +1,38 @@
package flvio package flvio
import ( import (
"io"
"time"
"fmt" "fmt"
"github.com/nareix/bits/pio" "github.com/nareix/bits/pio"
"github.com/nareix/joy4/av" "github.com/nareix/joy4/av"
"io"
"time"
) )
func TsToTime(ts int32) time.Duration { func TsToTime(ts int32) time.Duration {
return time.Millisecond*time.Duration(ts) return time.Millisecond * time.Duration(ts)
} }
func TimeToTs(tm time.Duration) int32 { func TimeToTs(tm time.Duration) int32 {
return int32(tm / time.Millisecond) return int32(tm / time.Millisecond)
} }
const MaxTagSubHeaderLength = 16
const ( const (
TAG_AUDIO = 8 TAG_AUDIO = 8
TAG_VIDEO = 9 TAG_VIDEO = 9
TAG_SCRIPTDATA = 18 TAG_SCRIPTDATA = 18
) )
const MaxTagSubHeaderLength = 16
type Tag interface {
Type() uint8
GetData() []byte
SetData([]byte)
FillHeader([]byte) int
ParseHeader([]byte) (int,error)
}
type Scriptdata struct {
Data []byte
}
func (self Scriptdata) Type() uint8 {
return TAG_SCRIPTDATA
}
func (self *Scriptdata) FillHeader(b []byte) (n int) {
return
}
func (self *Scriptdata) ParseHeader(b []byte) (n int, err error) {
return
}
func (self Scriptdata) GetData() []byte {
return self.Data
}
func (self *Scriptdata) SetData(b []byte) {
self.Data = b
}
const ( const (
SOUND_MP3 = 2 SOUND_MP3 = 2
SOUND_NELLYMOSER_16KHZ_MONO = 4 SOUND_NELLYMOSER_16KHZ_MONO = 4
SOUND_NELLYMOSER_8KHZ_MONO = 5 SOUND_NELLYMOSER_8KHZ_MONO = 5
SOUND_NELLYMOSER = 6 SOUND_NELLYMOSER = 6
SOUND_ALAW = 7 SOUND_ALAW = 7
SOUND_MULAW = 8 SOUND_MULAW = 8
SOUND_AAC = 10 SOUND_AAC = 10
SOUND_SPEEX = 11 SOUND_SPEEX = 11
SOUND_5_5Khz = 0 SOUND_5_5Khz = 0
SOUND_11Khz = 1 SOUND_11Khz = 1
@ -81,7 +49,20 @@ const (
AAC_RAW = 1 AAC_RAW = 1
) )
type Audiodata struct { const (
AVC_SEQHDR = 0
AVC_NALU = 1
AVC_EOS = 2
FRAME_KEY = 1
FRAME_INTER = 2
VIDEO_H264 = 7
)
type Tag struct {
Type uint8
/* /*
SoundFormat: UB[4] SoundFormat: UB[4]
0 = Linear PCM, platform endian 0 = Linear PCM, platform endian
@ -139,85 +120,6 @@ type Audiodata struct {
*/ */
AACPacketType uint8 AACPacketType uint8
Data []byte
}
func (self Audiodata) Type() uint8 {
return TAG_AUDIO
}
func (self Audiodata) ChannelLayout() av.ChannelLayout {
if self.SoundType == SOUND_MONO {
return av.CH_MONO
} else {
return av.CH_STEREO
}
}
func (self *Audiodata) ParseHeader(b []byte) (n int, err error) {
if len(b) < n+1 {
err = fmt.Errorf("audiodata: parse invalid")
return
}
flags := b[n]
n++
self.SoundFormat = flags >> 4
self.SoundRate = (flags >> 2) & 0x3
self.SoundSize = (flags >> 1) & 0x1
self.SoundType = flags & 0x1
switch self.SoundFormat {
case SOUND_AAC:
if len(b) < n+1 {
err = fmt.Errorf("audiodata: parse invalid")
return
}
self.AACPacketType = b[n]
n++
}
return
}
func (self Audiodata) FillHeader(b []byte) (n int) {
var flags uint8
flags |= self.SoundFormat << 4
flags |= self.SoundRate << 2
flags |= self.SoundSize << 1
flags |= self.SoundType
b[n] = flags
n++
switch self.SoundFormat {
case SOUND_AAC:
b[n] = self.AACPacketType
n++
}
return
}
func (self Audiodata) GetData() []byte {
return self.Data
}
func (self *Audiodata) SetData(b []byte) {
self.Data = b
}
const (
AVC_SEQHDR = 0
AVC_NALU = 1
AVC_EOS = 2
FRAME_KEY = 1
FRAME_INTER = 2
VIDEO_H264 = 7
)
type Videodata struct {
/* /*
1: keyframe (for AVC, a seekable frame) 1: keyframe (for AVC, a seekable frame)
2: inter frame (for AVC, a non- seekable frame) 2: inter frame (for AVC, a non- seekable frame)
@ -245,15 +147,64 @@ type Videodata struct {
*/ */
AVCPacketType uint8 AVCPacketType uint8
Data []byte
CompositionTime int32 CompositionTime int32
Data []byte
} }
func (self Videodata) Type() uint8 { func (self Tag) ChannelLayout() av.ChannelLayout {
return TAG_VIDEO if self.SoundType == SOUND_MONO {
return av.CH_MONO
} else {
return av.CH_STEREO
}
} }
func (self *Videodata) ParseHeader(b []byte) (n int, err error) { func (self *Tag) audioParseHeader(b []byte) (n int, err error) {
if len(b) < n+1 {
err = fmt.Errorf("audiodata: parse invalid")
return
}
flags := b[n]
n++
self.SoundFormat = flags >> 4
self.SoundRate = (flags >> 2) & 0x3
self.SoundSize = (flags >> 1) & 0x1
self.SoundType = flags & 0x1
switch self.SoundFormat {
case SOUND_AAC:
if len(b) < n+1 {
err = fmt.Errorf("audiodata: parse invalid")
return
}
self.AACPacketType = b[n]
n++
}
return
}
func (self Tag) audioFillHeader(b []byte) (n int) {
var flags uint8
flags |= self.SoundFormat << 4
flags |= self.SoundRate << 2
flags |= self.SoundSize << 1
flags |= self.SoundType
b[n] = flags
n++
switch self.SoundFormat {
case SOUND_AAC:
b[n] = self.AACPacketType
n++
}
return
}
func (self *Tag) videoParseHeader(b []byte) (n int, err error) {
if len(b) < n+1 { if len(b) < n+1 {
err = fmt.Errorf("videodata: parse invalid") err = fmt.Errorf("videodata: parse invalid")
return return
@ -278,7 +229,7 @@ func (self *Videodata) ParseHeader(b []byte) (n int, err error) {
return return
} }
func (self Videodata) FillHeader(b []byte) (n int) { func (self Tag) videoFillHeader(b []byte) (n int) {
flags := self.FrameType<<4 | self.CodecID flags := self.FrameType<<4 | self.CodecID
b[n] = flags b[n] = flags
n++ n++
@ -289,12 +240,28 @@ func (self Videodata) FillHeader(b []byte) (n int) {
return return
} }
func (self Videodata) GetData() []byte { func (self Tag) FillHeader(b []byte) (n int) {
return self.Data switch self.Type {
case TAG_AUDIO:
return self.audioFillHeader(b)
case TAG_VIDEO:
return self.videoFillHeader(b)
}
return
} }
func (self *Videodata) SetData(b []byte) { func (self *Tag) ParseHeader(b []byte) (n int, err error) {
self.Data = b switch self.Type {
case TAG_AUDIO:
return self.audioParseHeader(b)
case TAG_VIDEO:
return self.videoParseHeader(b)
}
return
} }
const ( const (
@ -313,14 +280,8 @@ func ParseTagHeader(b []byte) (tag Tag, ts int32, datalen int, err error) {
tagtype := b[0] tagtype := b[0]
switch tagtype { switch tagtype {
case TAG_AUDIO: case TAG_AUDIO, TAG_VIDEO, TAG_SCRIPTDATA:
tag = &Audiodata{} tag = Tag{Type: tagtype}
case TAG_VIDEO:
tag = &Videodata{}
case TAG_SCRIPTDATA:
tag = &Scriptdata{}
default: default:
err = fmt.Errorf("flvio: ReadTag tagtype=%d invalid", tagtype) err = fmt.Errorf("flvio: ReadTag tagtype=%d invalid", tagtype)
@ -333,7 +294,7 @@ func ParseTagHeader(b []byte) (tag Tag, ts int32, datalen int, err error) {
var tshi uint8 var tshi uint8
tslo = pio.U24BE(b[4:7]) tslo = pio.U24BE(b[4:7])
tshi = b[7] tshi = b[7]
ts = int32(tslo|uint32(tshi)<<24) ts = int32(tslo | uint32(tshi)<<24)
return return
} }
@ -353,10 +314,10 @@ func ReadTag(r io.Reader, b []byte) (tag Tag, ts int32, err error) {
} }
var n int var n int
if n, err = tag.ParseHeader(data); err != nil { if n, err = (&tag).ParseHeader(data); err != nil {
return return
} }
tag.SetData(data[n:]) tag.Data = data[n:]
if _, err = io.ReadFull(r, b[:4]); err != nil { if _, err = io.ReadFull(r, b[:4]); err != nil {
return return
@ -369,7 +330,7 @@ func FillTagHeader(b []byte, tagtype uint8, datalen int, ts int32) (n int) {
n++ n++
pio.PutU24BE(b[n:], uint32(datalen)) pio.PutU24BE(b[n:], uint32(datalen))
n += 3 n += 3
pio.PutU24BE(b[n:], uint32(ts & 0xffffff)) pio.PutU24BE(b[n:], uint32(ts&0xffffff))
n += 3 n += 3
b[n] = uint8(ts >> 24) b[n] = uint8(ts >> 24)
n++ n++
@ -385,12 +346,12 @@ func FillTagTrailer(b []byte, datalen int) (n int) {
} }
func WriteTag(w io.Writer, tag Tag, ts int32, b []byte) (err error) { func WriteTag(w io.Writer, tag Tag, ts int32, b []byte) (err error) {
data := tag.GetData() data := tag.Data
n := tag.FillHeader(b[TagHeaderLength:]) n := tag.FillHeader(b[TagHeaderLength:])
datalen := len(data)+n datalen := len(data) + n
n += FillTagHeader(b, tag.Type(), datalen, ts) n += FillTagHeader(b, tag.Type, datalen, ts)
if _, err = w.Write(b[:n]); err != nil { if _, err = w.Write(b[:n]); err != nil {
return return
@ -439,7 +400,7 @@ func ParseFileHeader(b []byte) (flags uint8, skip int, err error) {
flags = b[4] flags = b[4]
skip = int(pio.U32BE(b[5:9]))-9+4 skip = int(pio.U32BE(b[5:9])) - 9 + 4
if skip < 0 { if skip < 0 {
err = fmt.Errorf("flvio: file header datasize invalid") err = fmt.Errorf("flvio: file header datasize invalid")
return return
@ -447,5 +408,3 @@ func ParseFileHeader(b []byte) (flags uint8, skip int, err error) {
return return
} }

View File

@ -1,27 +1,26 @@
package rtmp package rtmp
import ( import (
"strings"
"bytes"
"net"
"net/url"
"bufio" "bufio"
"time" "bytes"
"fmt" "crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/hex" "encoding/hex"
"io" "fmt"
"github.com/nareix/bits/pio" "github.com/nareix/bits/pio"
"github.com/nareix/joy4/format/flv"
"github.com/nareix/joy4/format/flv/flvio"
"github.com/nareix/joy4/av" "github.com/nareix/joy4/av"
"github.com/nareix/joy4/av/avutil" "github.com/nareix/joy4/av/avutil"
"crypto/hmac" "github.com/nareix/joy4/format/flv"
"crypto/sha256" "github.com/nareix/joy4/format/flv/flvio"
"crypto/rand" "io"
"net"
"net/url"
"strings"
"time"
) )
var Debug bool var Debug bool
func ParseURL(uri string) (u *url.URL, err error) { func ParseURL(uri string) (u *url.URL, err error) {
if u, err = url.Parse(uri); err != nil { if u, err = url.Parse(uri); err != nil {
@ -55,10 +54,10 @@ func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) {
} }
type Server struct { type Server struct {
Addr string Addr string
HandlePublish func(*Conn) HandlePublish func(*Conn)
HandlePlay func(*Conn) HandlePlay func(*Conn)
HandleConn func(*Conn) HandleConn func(*Conn)
} }
func (self *Server) handleConn(conn *Conn) (err error) { func (self *Server) handleConn(conn *Conn) (err error) {
@ -125,21 +124,21 @@ func (self *Server) ListenAndServe() (err error) {
} }
const ( const (
stageHandshakeDone = iota+1 stageHandshakeDone = iota + 1
stageCommandDone stageCommandDone
stageCodecDataDone stageCodecDataDone
) )
const ( const (
prepareReading = iota+1 prepareReading = iota + 1
prepareWriting prepareWriting
) )
type Conn struct { type Conn struct {
URL *url.URL URL *url.URL
OnPlayOrPublish func(string,flvio.AMFMap) error OnPlayOrPublish func(string, flvio.AMFMap) error
prober *flv.Prober prober *flv.Prober
streams []av.CodecData streams []av.CodecData
txbytes uint64 txbytes uint64
@ -148,37 +147,37 @@ type Conn struct {
bufr *bufio.Reader bufr *bufio.Reader
bufw *bufio.Writer bufw *bufio.Writer
ackn uint32 ackn uint32
writebuf []byte
readbuf []byte
netconn net.Conn writebuf []byte
readbuf []byte
netconn net.Conn
txrxcount *txrxcount txrxcount *txrxcount
writeMaxChunkSize int writeMaxChunkSize int
readMaxChunkSize int readMaxChunkSize int
readAckSize uint32 readAckSize uint32
readcsmap map[uint32]*chunkStream readcsmap map[uint32]*chunkStream
isserver bool isserver bool
publishing, playing bool publishing, playing bool
reading, writing bool reading, writing bool
stage int stage int
avmsgsid uint32 avmsgsid uint32
gotcommand bool gotcommand bool
commandname string commandname string
commandtransid float64 commandtransid float64
commandobj flvio.AMFMap commandobj flvio.AMFMap
commandparams []interface{} commandparams []interface{}
gotmsg bool gotmsg bool
timestamp uint32 timestamp uint32
msgdata []byte msgdata []byte
msgtypeid uint8 msgtypeid uint8
datamsgvals []interface{} datamsgvals []interface{}
videodata *flvio.Videodata avtag flvio.Tag
audiodata *flvio.Audiodata
eventtype uint16 eventtype uint16
} }
@ -217,15 +216,15 @@ func NewConn(netconn net.Conn) *Conn {
} }
type chunkStream struct { type chunkStream struct {
timenow uint32 timenow uint32
timedelta uint32 timedelta uint32
hastimeext bool hastimeext bool
msgsid uint32 msgsid uint32
msgtypeid uint8 msgtypeid uint8
msgdatalen uint32 msgdatalen uint32
msgdataleft uint32 msgdataleft uint32
msghdrtype uint8 msghdrtype uint8
msgdata []byte msgdata []byte
} }
func (self *chunkStream) Start() { func (self *chunkStream) Start() {
@ -234,22 +233,22 @@ func (self *chunkStream) Start() {
} }
const ( const (
msgtypeidUserControl = 4 msgtypeidUserControl = 4
msgtypeidAck = 3 msgtypeidAck = 3
msgtypeidWindowAckSize = 5 msgtypeidWindowAckSize = 5
msgtypeidSetPeerBandwidth = 6 msgtypeidSetPeerBandwidth = 6
msgtypeidSetChunkSize = 1 msgtypeidSetChunkSize = 1
msgtypeidCommandMsgAMF0 = 20 msgtypeidCommandMsgAMF0 = 20
msgtypeidCommandMsgAMF3 = 17 msgtypeidCommandMsgAMF3 = 17
msgtypeidDataMsgAMF0 = 18 msgtypeidDataMsgAMF0 = 18
msgtypeidDataMsgAMF3 = 15 msgtypeidDataMsgAMF3 = 15
msgtypeidVideoMsg = 9 msgtypeidVideoMsg = 9
msgtypeidAudioMsg = 8 msgtypeidAudioMsg = 8
) )
const ( const (
eventtypeStreamBegin = 0 eventtypeStreamBegin = 0
eventtypeSetBufferLength = 3 eventtypeSetBufferLength = 3
eventtypeStreamIsRecorded = 4 eventtypeStreamIsRecorded = 4
) )
@ -286,11 +285,8 @@ func (self *Conn) pollAVTag() (tag flvio.Tag, err error) {
return return
} }
switch self.msgtypeid { switch self.msgtypeid {
case msgtypeidVideoMsg: case msgtypeidVideoMsg, msgtypeidAudioMsg:
tag = self.videodata tag = self.avtag
return
case msgtypeidAudioMsg:
tag = self.audiodata
return return
} }
} }
@ -300,8 +296,7 @@ func (self *Conn) pollMsg() (err error) {
self.gotmsg = false self.gotmsg = false
self.gotcommand = false self.gotcommand = false
self.datamsgvals = nil self.datamsgvals = nil
self.videodata = nil self.avtag = flvio.Tag{}
self.audiodata = nil
for { for {
if err = self.readChunk(); err != nil { if err = self.readChunk(); err != nil {
return return
@ -326,7 +321,7 @@ func SplitPath(u *url.URL) (app, stream string) {
func getTcUrl(u *url.URL) string { func getTcUrl(u *url.URL) string {
app, _ := SplitPath(u) app, _ := SplitPath(u)
nu := *u nu := *u
nu.Path = "/"+app nu.Path = "/" + app
return nu.String() return nu.String()
} }
@ -358,7 +353,7 @@ var CodecTypes = flv.CodecTypes
func (self *Conn) writeBasicConf() (err error) { func (self *Conn) writeBasicConf() (err error) {
// > SetChunkSize // > SetChunkSize
if err = self.writeSetChunkSize(1024*1024*128); err != nil { if err = self.writeSetChunkSize(1024 * 1024 * 128); err != nil {
return return
} }
// > WindowAckSize // > WindowAckSize
@ -412,13 +407,13 @@ func (self *Conn) readConnect() (err error) {
// > _result("NetConnection.Connect.Success") // > _result("NetConnection.Connect.Success")
if err = self.writeCommandMsg(3, 0, "_result", self.commandtransid, if err = self.writeCommandMsg(3, 0, "_result", self.commandtransid,
flvio.AMFMap{ flvio.AMFMap{
"fmtVer": "FMS/3,0,1,123", "fmtVer": "FMS/3,0,1,123",
"capabilities": 31, "capabilities": 31,
}, },
flvio.AMFMap{ flvio.AMFMap{
"level": "status", "level": "status",
"code": "NetConnection.Connect.Success", "code": "NetConnection.Connect.Success",
"description": "Connection succeeded.", "description": "Connection succeeded.",
"objectEncoding": 3, "objectEncoding": 3,
}, },
); err != nil { ); err != nil {
@ -478,9 +473,9 @@ func (self *Conn) readConnect() (err error) {
if err = self.writeCommandMsg(5, self.avmsgsid, if err = self.writeCommandMsg(5, self.avmsgsid,
"onStatus", self.commandtransid, nil, "onStatus", self.commandtransid, nil,
flvio.AMFMap{ flvio.AMFMap{
"level": "status", "level": "status",
"code": code, "code": "NetStream.Publish.Start",
"description": description, "description": "Start publishing",
}, },
); err != nil { ); err != nil {
return return
@ -521,8 +516,8 @@ func (self *Conn) readConnect() (err error) {
if err = self.writeCommandMsg(5, self.avmsgsid, if err = self.writeCommandMsg(5, self.avmsgsid,
"onStatus", self.commandtransid, nil, "onStatus", self.commandtransid, nil,
flvio.AMFMap{ flvio.AMFMap{
"level": "status", "level": "status",
"code": "NetStream.Play.Start", "code": "NetStream.Play.Start",
"description": "Start live", "description": "Start live",
}, },
); err != nil { ); err != nil {
@ -619,13 +614,13 @@ func (self *Conn) writeConnect(path string) (err error) {
} }
if err = self.writeCommandMsg(3, 0, "connect", 1, if err = self.writeCommandMsg(3, 0, "connect", 1,
flvio.AMFMap{ flvio.AMFMap{
"app": path, "app": path,
"flashVer": "MAC 22,0,0,192", "flashVer": "MAC 22,0,0,192",
"tcUrl": getTcUrl(self.URL), "tcUrl": getTcUrl(self.URL),
"fpad": false, "fpad": false,
"capabilities": 15, "capabilities": 15,
"audioCodecs": 4071, "audioCodecs": 4071,
"videoCodecs": 252, "videoCodecs": 252,
"videoFunction": 1, "videoFunction": 1,
}, },
); err != nil { ); err != nil {
@ -937,7 +932,7 @@ func (self *Conn) tmpwbuf(n int) []byte {
func (self *Conn) writeSetChunkSize(size int) (err error) { func (self *Conn) writeSetChunkSize(size int) (err error) {
self.writeMaxChunkSize = size self.writeMaxChunkSize = size
b := self.tmpwbuf(chunkHeaderLength+4) b := self.tmpwbuf(chunkHeaderLength + 4)
n := self.fillChunkHeader(b, 2, 0, msgtypeidSetChunkSize, 0, 4) n := self.fillChunkHeader(b, 2, 0, msgtypeidSetChunkSize, 0, 4)
pio.PutU32BE(b[n:], uint32(size)) pio.PutU32BE(b[n:], uint32(size))
n += 4 n += 4
@ -946,7 +941,7 @@ func (self *Conn) writeSetChunkSize(size int) (err error) {
} }
func (self *Conn) writeAck(seqnum uint32) (err error) { func (self *Conn) writeAck(seqnum uint32) (err error) {
b := self.tmpwbuf(chunkHeaderLength+4) b := self.tmpwbuf(chunkHeaderLength + 4)
n := self.fillChunkHeader(b, 2, 0, msgtypeidAck, 0, 4) n := self.fillChunkHeader(b, 2, 0, msgtypeidAck, 0, 4)
pio.PutU32BE(b[n:], seqnum) pio.PutU32BE(b[n:], seqnum)
n += 4 n += 4
@ -955,7 +950,7 @@ func (self *Conn) writeAck(seqnum uint32) (err error) {
} }
func (self *Conn) writeWindowAckSize(size uint32) (err error) { func (self *Conn) writeWindowAckSize(size uint32) (err error) {
b := self.tmpwbuf(chunkHeaderLength+4) b := self.tmpwbuf(chunkHeaderLength + 4)
n := self.fillChunkHeader(b, 2, 0, msgtypeidWindowAckSize, 0, 4) n := self.fillChunkHeader(b, 2, 0, msgtypeidWindowAckSize, 0, 4)
pio.PutU32BE(b[n:], size) pio.PutU32BE(b[n:], size)
n += 4 n += 4
@ -964,7 +959,7 @@ func (self *Conn) writeWindowAckSize(size uint32) (err error) {
} }
func (self *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err error) { func (self *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err error) {
b := self.tmpwbuf(chunkHeaderLength+5) b := self.tmpwbuf(chunkHeaderLength + 5)
n := self.fillChunkHeader(b, 2, 0, msgtypeidSetPeerBandwidth, 0, 5) n := self.fillChunkHeader(b, 2, 0, msgtypeidSetPeerBandwidth, 0, 5)
pio.PutU32BE(b[n:], acksize) pio.PutU32BE(b[n:], acksize)
n += 4 n += 4
@ -974,21 +969,21 @@ func (self *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err er
return return
} }
func (self *Conn) writeCommandMsg(csid, msgsid uint32, args... interface{}) (err error) { func (self *Conn) writeCommandMsg(csid, msgsid uint32, args ...interface{}) (err error) {
return self.writeAMF0Msg(msgtypeidCommandMsgAMF0, csid, msgsid, args...) return self.writeAMF0Msg(msgtypeidCommandMsgAMF0, csid, msgsid, args...)
} }
func (self *Conn) writeDataMsg(csid, msgsid uint32, args... interface{}) (err error) { func (self *Conn) writeDataMsg(csid, msgsid uint32, args ...interface{}) (err error) {
return self.writeAMF0Msg(msgtypeidDataMsgAMF0, csid, msgsid, args...) return self.writeAMF0Msg(msgtypeidDataMsgAMF0, csid, msgsid, args...)
} }
func (self *Conn) writeAMF0Msg(msgtypeid uint8, csid, msgsid uint32, args... interface{}) (err error) { func (self *Conn) writeAMF0Msg(msgtypeid uint8, csid, msgsid uint32, args ...interface{}) (err error) {
size := 0 size := 0
for _, arg := range args { for _, arg := range args {
size += flvio.LenAMF0Val(arg) size += flvio.LenAMF0Val(arg)
} }
b := self.tmpwbuf(chunkHeaderLength+size) b := self.tmpwbuf(chunkHeaderLength + size)
n := self.fillChunkHeader(b, csid, 0, msgtypeid, msgsid, size) n := self.fillChunkHeader(b, csid, 0, msgtypeid, msgsid, size)
for _, arg := range args { for _, arg := range args {
n += flvio.FillAMF0Val(b[n:], arg) n += flvio.FillAMF0Val(b[n:], arg)
@ -1003,25 +998,25 @@ func (self *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) {
var csid uint32 var csid uint32
var data []byte var data []byte
switch _tag := tag.(type) { switch tag.Type {
case *flvio.Audiodata: case flvio.TAG_AUDIO:
msgtypeid = msgtypeidAudioMsg msgtypeid = msgtypeidAudioMsg
csid = 6 csid = 6
data = _tag.Data data = tag.Data
case *flvio.Videodata: case flvio.TAG_VIDEO:
msgtypeid = msgtypeidVideoMsg msgtypeid = msgtypeidVideoMsg
csid = 7 csid = 7
data = _tag.Data data = tag.Data
} }
b := self.tmpwbuf(chunkHeaderLength+flvio.MaxTagSubHeaderLength) b := self.tmpwbuf(chunkHeaderLength + flvio.MaxTagSubHeaderLength)
hdrlen := tag.FillHeader(b[chunkHeaderLength:]) hdrlen := tag.FillHeader(b[chunkHeaderLength:])
self.fillChunkHeader(b, csid, ts, msgtypeid, self.avmsgsid, hdrlen+len(data)) self.fillChunkHeader(b, csid, ts, msgtypeid, self.avmsgsid, hdrlen+len(data))
n := hdrlen+chunkHeaderLength n := hdrlen + chunkHeaderLength
if n+len(data) > self.writeMaxChunkSize { if n+len(data) > self.writeMaxChunkSize {
if err = self.writeSetChunkSize(n+len(data)); err != nil { if err = self.writeSetChunkSize(n + len(data)); err != nil {
return return
} }
} }
@ -1034,7 +1029,7 @@ func (self *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) {
} }
func (self *Conn) writeStreamBegin(msgsid uint32) (err error) { func (self *Conn) writeStreamBegin(msgsid uint32) (err error) {
b := self.tmpwbuf(chunkHeaderLength+6) b := self.tmpwbuf(chunkHeaderLength + 6)
n := self.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6) n := self.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6)
pio.PutU16BE(b[n:], eventtypeStreamBegin) pio.PutU16BE(b[n:], eventtypeStreamBegin)
n += 2 n += 2
@ -1045,7 +1040,7 @@ func (self *Conn) writeStreamBegin(msgsid uint32) (err error) {
} }
func (self *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err error) { func (self *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err error) {
b := self.tmpwbuf(chunkHeaderLength+10) b := self.tmpwbuf(chunkHeaderLength + 10)
n := self.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 10) n := self.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 10)
pio.PutU16BE(b[n:], eventtypeSetBufferLength) pio.PutU16BE(b[n:], eventtypeSetBufferLength)
n += 2 n += 2
@ -1072,7 +1067,7 @@ func (self *Conn) fillChunkHeader(b []byte, csid uint32, timestamp int32, msgtyp
// //
// Figure 9 Chunk Message Header Type 0 // Figure 9 Chunk Message Header Type 0
b[n] = byte(csid)&0x3f b[n] = byte(csid) & 0x3f
n++ n++
pio.PutU24BE(b[n:], uint32(timestamp)) pio.PutU24BE(b[n:], uint32(timestamp))
n += 3 n += 3
@ -1109,9 +1104,9 @@ func (self *Conn) readChunk() (err error) {
var msghdrtype uint8 var msghdrtype uint8
var csid uint32 var csid uint32
msghdrtype = header>>6 msghdrtype = header >> 6
csid = uint32(header)&0x3f csid = uint32(header) & 0x3f
switch csid { switch csid {
default: // Chunk basic header 1 default: // Chunk basic header 1
case 0: // Chunk basic header 2 case 0: // Chunk basic header 2
@ -1119,13 +1114,13 @@ func (self *Conn) readChunk() (err error) {
return return
} }
n += 1 n += 1
csid = uint32(b[0])+64 csid = uint32(b[0]) + 64
case 1: // Chunk basic header 3 case 1: // Chunk basic header 3
if _, err = io.ReadFull(self.bufr, b[:2]); err != nil { if _, err = io.ReadFull(self.bufr, b[:2]); err != nil {
return return
} }
n += 2 n += 2
csid = uint32(pio.U16BE(b))+64 csid = uint32(pio.U16BE(b)) + 64
} }
cs := self.readcsmap[csid] cs := self.readcsmap[csid]
@ -1282,8 +1277,8 @@ func (self *Conn) readChunk() (err error) {
if size > self.readMaxChunkSize { if size > self.readMaxChunkSize {
size = self.readMaxChunkSize size = self.readMaxChunkSize
} }
off := cs.msgdatalen-cs.msgdataleft off := cs.msgdatalen - cs.msgdataleft
buf := cs.msgdata[off:int(off)+size] buf := cs.msgdata[off : int(off)+size]
if _, err = io.ReadFull(self.bufr, buf); err != nil { if _, err = io.ReadFull(self.bufr, buf); err != nil {
return return
} }
@ -1408,28 +1403,28 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms
if len(msgdata) == 0 { if len(msgdata) == 0 {
return return
} }
tag := &flvio.Videodata{} tag := flvio.Tag{Type: flvio.TAG_VIDEO}
var n int var n int
if n, err = tag.ParseHeader(msgdata); err != nil { if n, err = (&tag).ParseHeader(msgdata); err != nil {
return return
} }
if !(tag.FrameType == flvio.FRAME_INTER || tag.FrameType == flvio.FRAME_KEY) { if !(tag.FrameType == flvio.FRAME_INTER || tag.FrameType == flvio.FRAME_KEY) {
return return
} }
tag.Data = msgdata[n:] tag.Data = msgdata[n:]
self.videodata = tag self.avtag = tag
case msgtypeidAudioMsg: case msgtypeidAudioMsg:
if len(msgdata) == 0 { if len(msgdata) == 0 {
return return
} }
tag := &flvio.Audiodata{} tag := flvio.Tag{Type: flvio.TAG_AUDIO}
var n int var n int
if n, err = tag.ParseHeader(msgdata); err != nil { if n, err = (&tag).ParseHeader(msgdata); err != nil {
return return
} }
tag.Data = msgdata[n:] tag.Data = msgdata[n:]
self.audiodata = tag self.avtag = tag
case msgtypeidSetChunkSize: case msgtypeidSetChunkSize:
if len(msgdata) < 4 { if len(msgdata) < 4 {
@ -1481,7 +1476,7 @@ func hsCalcDigestPos(p []byte, base int) (pos int) {
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
pos += int(p[base+i]) pos += int(p[base+i])
} }
pos = (pos%728)+base+4 pos = (pos % 728) + base + 4
return return
} }
@ -1519,13 +1514,13 @@ func hsCreate01(p []byte, time uint32, ver uint32, key []byte) {
func hsCreate2(p []byte, key []byte) { func hsCreate2(p []byte, key []byte) {
rand.Read(p) rand.Read(p)
gap := len(p)-32 gap := len(p) - 32
digest := hsMakeDigest(key, p, gap) digest := hsMakeDigest(key, p, gap)
copy(p[gap:], digest) copy(p[gap:], digest)
} }
func (self *Conn) handshakeClient() (err error) { func (self *Conn) handshakeClient() (err error) {
var random [(1+1536*2)*2]byte var random [(1 + 1536*2) * 2]byte
C0C1C2 := random[:1536*2+1] C0C1C2 := random[:1536*2+1]
C0 := C0C1C2[:1] C0 := C0C1C2[:1]
@ -1535,7 +1530,7 @@ func (self *Conn) handshakeClient() (err error) {
S0S1S2 := random[1536*2+1:] S0S1S2 := random[1536*2+1:]
//S0 := S0S1S2[:1] //S0 := S0S1S2[:1]
S1 := S0S1S2[1:1536+1] S1 := S0S1S2[1 : 1536+1]
//S0S1 := S0S1S2[:1536+1] //S0S1 := S0S1S2[:1536+1]
//S2 := S0S1S2[1536+1:] //S2 := S0S1S2[1536+1:]
@ -1556,7 +1551,7 @@ func (self *Conn) handshakeClient() (err error) {
} }
if Debug { if Debug {
fmt.Println("rtmp: handshakeClient: server version", S1[4],S1[5],S1[6],S1[7]) fmt.Println("rtmp: handshakeClient: server version", S1[4], S1[5], S1[6], S1[7])
} }
if ver := pio.U32BE(S1[4:8]); ver != 0 { if ver := pio.U32BE(S1[4:8]); ver != 0 {
@ -1575,17 +1570,17 @@ func (self *Conn) handshakeClient() (err error) {
} }
func (self *Conn) handshakeServer() (err error) { func (self *Conn) handshakeServer() (err error) {
var random [(1+1536*2)*2]byte var random [(1 + 1536*2) * 2]byte
C0C1C2 := random[:1536*2+1] C0C1C2 := random[:1536*2+1]
C0 := C0C1C2[:1] C0 := C0C1C2[:1]
C1 := C0C1C2[1:1536+1] C1 := C0C1C2[1 : 1536+1]
C0C1 := C0C1C2[:1536+1] C0C1 := C0C1C2[:1536+1]
C2 := C0C1C2[1536+1:] C2 := C0C1C2[1536+1:]
S0S1S2 := random[1536*2+1:] S0S1S2 := random[1536*2+1:]
S0 := S0S1S2[:1] S0 := S0S1S2[:1]
S1 := S0S1S2[1:1536+1] S1 := S0S1S2[1 : 1536+1]
S0S1 := S0S1S2[:1536+1] S0S1 := S0S1S2[:1536+1]
S2 := S0S1S2[1536+1:] S2 := S0S1S2[1536+1:]
@ -1749,4 +1744,3 @@ func Handler(h *avutil.RegisterHandler) {
h.CodecTypes = CodecTypes h.CodecTypes = CodecTypes
} }