chang metadata, add stream

This commit is contained in:
nareix 2016-06-29 06:53:18 +08:00
parent 7d7a93ce1d
commit be7b15bd46

View File

@ -136,12 +136,22 @@ func (self *Server) ListenAndServe() (err error) {
} }
} }
type stream struct {
codec av.CodecData
}
func newStream(codec av.CodecData) *stream {
return &stream{
codec: codec,
}
}
type Conn struct { type Conn struct {
Host string Host string
Path string Path string
Debug bool Debug bool
streams []av.CodecData streams []*stream
videostreamidx, audiostreamidx int videostreamidx, audiostreamidx int
probepkts []flvio.Tag probepkts []flvio.Tag
@ -229,6 +239,7 @@ const (
const ( const (
eventtypeStreamBegin = 0 eventtypeStreamBegin = 0
eventtypeSetBufferLength = 3 eventtypeSetBufferLength = 3
eventtypeStreamIsRecorded = 4
) )
func (self *Conn) Close() (err error) { func (self *Conn) Close() (err error) {
@ -308,7 +319,7 @@ func (self *Conn) determineType() (err error) {
if err = self.writeSetPeerBandwidth(5000000, 2); err != nil { if err = self.writeSetPeerBandwidth(5000000, 2); err != nil {
return return
} }
self.writeMaxChunkSize = 4096 self.writeMaxChunkSize = 4000
// > SetChunkSize // > SetChunkSize
if err = self.writeSetChunkSize(uint32(self.writeMaxChunkSize)); err != nil { if err = self.writeSetChunkSize(uint32(self.writeMaxChunkSize)); err != nil {
return return
@ -328,7 +339,9 @@ func (self *Conn) determineType() (err error) {
"description": "Connection Success.", "description": "Connection Success.",
"objectEncoding": 3, "objectEncoding": 3,
}) })
self.writeCommandMsgEnd(3, 0) if err = self.writeCommandMsgEnd(3, 0); err != nil {
return
}
for { for {
if err = self.pollMsg(); err != nil { if err = self.pollMsg(); err != nil {
@ -346,7 +359,9 @@ func (self *Conn) determineType() (err error) {
flvio.WriteAMF0Val(w, self.commandtransid) flvio.WriteAMF0Val(w, self.commandtransid)
flvio.WriteAMF0Val(w, nil) flvio.WriteAMF0Val(w, nil)
flvio.WriteAMF0Val(w, self.avmsgsid) // streamid=1 flvio.WriteAMF0Val(w, self.avmsgsid) // streamid=1
self.writeCommandMsgEnd(3, 0) if err = self.writeCommandMsgEnd(3, 0); err != nil {
return
}
// < publish("path") // < publish("path")
case "publish": case "publish":
@ -370,7 +385,9 @@ func (self *Conn) determineType() (err error) {
"code": "NetStream.Publish.Start", "code": "NetStream.Publish.Start",
"description": "Start publishing", "description": "Start publishing",
}) })
self.writeCommandMsgEnd(5, self.avmsgsid) if err = self.writeCommandMsgEnd(5, self.avmsgsid); err != nil {
return
}
self.Path = formatPath(connectpath, publishpath) self.Path = formatPath(connectpath, publishpath)
self.publishing = true self.publishing = true
@ -390,7 +407,9 @@ func (self *Conn) determineType() (err error) {
playpath, _ := self.commandparams[0].(string) playpath, _ := self.commandparams[0].(string)
// > streamBegin(streamid) // > streamBegin(streamid)
self.writeStreamBegin(self.avmsgsid) if err = self.writeStreamBegin(self.avmsgsid); err != nil {
return
}
// > onStatus() // > onStatus()
w := self.writeCommandMsgStart() w := self.writeCommandMsgStart()
@ -402,14 +421,25 @@ func (self *Conn) determineType() (err error) {
"code": "NetStream.Play.Start", "code": "NetStream.Play.Start",
"description": "Start live", "description": "Start live",
}) })
self.writeCommandMsgEnd(5, self.avmsgsid) if err = self.writeCommandMsgEnd(5, self.avmsgsid); err != nil {
return
}
// > Stream Is Recored
w = self.writeUserControlMsgStart(eventtypeStreamIsRecorded)
w.WriteU32BE(self.avmsgsid)
if err = self.writeUserControlMsgEnd(); err != nil {
return
}
// > |RtmpSampleAccess() // > |RtmpSampleAccess()
w = self.writeDataMsgStart() w = self.writeDataMsgStart()
flvio.WriteAMF0Val(w, "|RtmpSampleAccess") flvio.WriteAMF0Val(w, "|RtmpSampleAccess")
flvio.WriteAMF0Val(w, true) flvio.WriteAMF0Val(w, true)
flvio.WriteAMF0Val(w, true) flvio.WriteAMF0Val(w, true)
self.writeDataMsgEnd(5, self.avmsgsid) if err = self.writeDataMsgEnd(5, self.avmsgsid); err != nil {
return
}
self.Path = formatPath(connectpath, playpath) self.Path = formatPath(connectpath, playpath)
self.playing = true self.playing = true
@ -477,7 +507,7 @@ func (self *Conn) probe() (err error) {
return return
} }
self.videostreamidx = len(self.streams) self.videostreamidx = len(self.streams)
self.streams = append(self.streams, h264) self.streams = append(self.streams, newStream(h264))
} else { } else {
self.probepkts = append(self.probepkts, tag) self.probepkts = append(self.probepkts, tag)
} }
@ -499,7 +529,7 @@ func (self *Conn) probe() (err error) {
return return
} }
self.audiostreamidx = len(self.streams) self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, aac) self.streams = append(self.streams, newStream(aac))
} else { } else {
self.probepkts = append(self.probepkts, tag) self.probepkts = append(self.probepkts, tag)
} }
@ -507,25 +537,25 @@ func (self *Conn) probe() (err error) {
case flvio.SOUND_NELLYMOSER, flvio.SOUND_NELLYMOSER_16KHZ_MONO, flvio.SOUND_NELLYMOSER_8KHZ_MONO: case flvio.SOUND_NELLYMOSER, flvio.SOUND_NELLYMOSER_16KHZ_MONO, flvio.SOUND_NELLYMOSER_8KHZ_MONO:
stream := codec.NewNellyMoserCodecData() stream := codec.NewNellyMoserCodecData()
self.audiostreamidx = len(self.streams) self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, stream) self.streams = append(self.streams, newStream(stream))
self.probepkts = append(self.probepkts, tag) self.probepkts = append(self.probepkts, tag)
case flvio.SOUND_ALAW: case flvio.SOUND_ALAW:
stream := codec.NewPCMAlawCodecData() stream := codec.NewPCMAlawCodecData()
self.audiostreamidx = len(self.streams) self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, stream) self.streams = append(self.streams, newStream(stream))
self.probepkts = append(self.probepkts, tag) self.probepkts = append(self.probepkts, tag)
case flvio.SOUND_MULAW: case flvio.SOUND_MULAW:
stream := codec.NewPCMMulawCodecData() stream := codec.NewPCMMulawCodecData()
self.audiostreamidx = len(self.streams) self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, stream) self.streams = append(self.streams, newStream(stream))
self.probepkts = append(self.probepkts, tag) self.probepkts = append(self.probepkts, tag)
case flvio.SOUND_SPEEX: case flvio.SOUND_SPEEX:
stream := codec.NewSpeexCodecData() stream := codec.NewSpeexCodecData()
self.audiostreamidx = len(self.streams) self.audiostreamidx = len(self.streams)
self.streams = append(self.streams, stream) self.streams = append(self.streams, newStream(stream))
self.probepkts = append(self.probepkts, tag) self.probepkts = append(self.probepkts, tag)
default: default:
@ -708,7 +738,9 @@ func (self *Conn) ReadHeader() (err error) {
} }
func (self *Conn) Streams() (streams []av.CodecData, err error) { func (self *Conn) Streams() (streams []av.CodecData, err error) {
streams = self.streams for _, stream := range self.streams {
streams = append(streams, stream.codec)
}
return return
} }
@ -721,15 +753,17 @@ func tsToTime(ts uint32) time.Duration {
} }
func (self *Conn) WritePacket(pkt av.Packet) (err error) { func (self *Conn) WritePacket(pkt av.Packet) (err error) {
ts := timeToTs(pkt.Time)
stream := self.streams[pkt.Idx] stream := self.streams[pkt.Idx]
ts := timeToTs(pkt.Time)
if self.Debug { if self.Debug {
fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts) fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts)
} }
switch stream.Type() { codec := stream.codec
switch codec.Type() {
case av.AAC: case av.AAC:
audiodata := self.makeAACAudiodata(stream.(av.AudioCodecData), flvio.AAC_RAW, pkt.Data) audiodata := self.makeAACAudiodata(codec.(av.AudioCodecData), flvio.AAC_RAW, pkt.Data)
w := self.writeAudioDataStart() w := self.writeAudioDataStart()
audiodata.Marshal(w) audiodata.Marshal(w)
if err = self.writeAudioDataEnd(ts); err != nil { if err = self.writeAudioDataEnd(ts); err != nil {
@ -752,8 +786,6 @@ func (self *Conn) WritePacket(pkt av.Packet) (err error) {
func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { func (self *Conn) WriteHeader(streams []av.CodecData) (err error) {
metadata := flvio.AMFECMAArray{} metadata := flvio.AMFECMAArray{}
metadata["duration"] = 0
for _, _stream := range streams { for _, _stream := range streams {
typ := _stream.Type() typ := _stream.Type()
switch { switch {
@ -770,8 +802,8 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) {
metadata["width"] = stream.Width() metadata["width"] = stream.Width()
metadata["height"] = stream.Height() metadata["height"] = stream.Height()
metadata["framerate"] = 24 // TODO: make it correct metadata["displayWidth"] = stream.Width()
metadata["videodatarate"] = 1538 metadata["displayHeight"] = stream.Height()
case typ.IsAudio(): case typ.IsAudio():
stream := _stream.(av.AudioCodecData) stream := _stream.(av.AudioCodecData)
@ -784,9 +816,7 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) {
return return
} }
metadata["audiodatarate"] = 156 metadata["audiosamplerate"] = stream.SampleRate()
metadata["audiosamplerate"] = 3
metadata["audiosamplesize"] = 1
} }
} }
@ -810,6 +840,7 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) {
if err = self.writeVideoDataEnd(0); err != nil { if err = self.writeVideoDataEnd(0); err != nil {
return return
} }
self.streams = append(self.streams, newStream(stream))
case av.AAC: case av.AAC:
aac := stream.(aacparser.CodecData) aac := stream.(aacparser.CodecData)
@ -819,10 +850,10 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) {
if err = self.writeAudioDataEnd(0); err != nil { if err = self.writeAudioDataEnd(0); err != nil {
return return
} }
self.streams = append(self.streams, newStream(stream))
} }
} }
self.streams = streams
return return
} }