diff --git a/server.go b/server.go index f0cef72..f1d26c2 100644 --- a/server.go +++ b/server.go @@ -136,12 +136,22 @@ func (self *Server) ListenAndServe() (err error) { } } +type stream struct { + codec av.CodecData +} + +func newStream(codec av.CodecData) *stream { + return &stream{ + codec: codec, + } +} + type Conn struct { Host string Path string Debug bool - streams []av.CodecData + streams []*stream videostreamidx, audiostreamidx int probepkts []flvio.Tag @@ -229,6 +239,7 @@ const ( const ( eventtypeStreamBegin = 0 eventtypeSetBufferLength = 3 + eventtypeStreamIsRecorded = 4 ) func (self *Conn) Close() (err error) { @@ -308,7 +319,7 @@ func (self *Conn) determineType() (err error) { if err = self.writeSetPeerBandwidth(5000000, 2); err != nil { return } - self.writeMaxChunkSize = 4096 + self.writeMaxChunkSize = 4000 // > SetChunkSize if err = self.writeSetChunkSize(uint32(self.writeMaxChunkSize)); err != nil { return @@ -328,7 +339,9 @@ func (self *Conn) determineType() (err error) { "description": "Connection Success.", "objectEncoding": 3, }) - self.writeCommandMsgEnd(3, 0) + if err = self.writeCommandMsgEnd(3, 0); err != nil { + return + } for { if err = self.pollMsg(); err != nil { @@ -346,7 +359,9 @@ func (self *Conn) determineType() (err error) { flvio.WriteAMF0Val(w, self.commandtransid) flvio.WriteAMF0Val(w, nil) flvio.WriteAMF0Val(w, self.avmsgsid) // streamid=1 - self.writeCommandMsgEnd(3, 0) + if err = self.writeCommandMsgEnd(3, 0); err != nil { + return + } // < publish("path") case "publish": @@ -370,7 +385,9 @@ func (self *Conn) determineType() (err error) { "code": "NetStream.Publish.Start", "description": "Start publishing", }) - self.writeCommandMsgEnd(5, self.avmsgsid) + if err = self.writeCommandMsgEnd(5, self.avmsgsid); err != nil { + return + } self.Path = formatPath(connectpath, publishpath) self.publishing = true @@ -390,7 +407,9 @@ func (self *Conn) determineType() (err error) { playpath, _ := self.commandparams[0].(string) // > streamBegin(streamid) - self.writeStreamBegin(self.avmsgsid) + if err = self.writeStreamBegin(self.avmsgsid); err != nil { + return + } // > onStatus() w := self.writeCommandMsgStart() @@ -402,14 +421,25 @@ func (self *Conn) determineType() (err error) { "code": "NetStream.Play.Start", "description": "Start live", }) - self.writeCommandMsgEnd(5, self.avmsgsid) + if err = self.writeCommandMsgEnd(5, self.avmsgsid); err != nil { + return + } + + // > Stream Is Recored + w = self.writeUserControlMsgStart(eventtypeStreamIsRecorded) + w.WriteU32BE(self.avmsgsid) + if err = self.writeUserControlMsgEnd(); err != nil { + return + } // > |RtmpSampleAccess() w = self.writeDataMsgStart() flvio.WriteAMF0Val(w, "|RtmpSampleAccess") flvio.WriteAMF0Val(w, true) flvio.WriteAMF0Val(w, true) - self.writeDataMsgEnd(5, self.avmsgsid) + if err = self.writeDataMsgEnd(5, self.avmsgsid); err != nil { + return + } self.Path = formatPath(connectpath, playpath) self.playing = true @@ -477,7 +507,7 @@ func (self *Conn) probe() (err error) { return } self.videostreamidx = len(self.streams) - self.streams = append(self.streams, h264) + self.streams = append(self.streams, newStream(h264)) } else { self.probepkts = append(self.probepkts, tag) } @@ -499,7 +529,7 @@ func (self *Conn) probe() (err error) { return } self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, aac) + self.streams = append(self.streams, newStream(aac)) } else { self.probepkts = append(self.probepkts, tag) } @@ -507,25 +537,25 @@ func (self *Conn) probe() (err error) { case flvio.SOUND_NELLYMOSER, flvio.SOUND_NELLYMOSER_16KHZ_MONO, flvio.SOUND_NELLYMOSER_8KHZ_MONO: stream := codec.NewNellyMoserCodecData() self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, stream) + self.streams = append(self.streams, newStream(stream)) self.probepkts = append(self.probepkts, tag) case flvio.SOUND_ALAW: stream := codec.NewPCMAlawCodecData() self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, stream) + self.streams = append(self.streams, newStream(stream)) self.probepkts = append(self.probepkts, tag) case flvio.SOUND_MULAW: stream := codec.NewPCMMulawCodecData() self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, stream) + self.streams = append(self.streams, newStream(stream)) self.probepkts = append(self.probepkts, tag) case flvio.SOUND_SPEEX: stream := codec.NewSpeexCodecData() self.audiostreamidx = len(self.streams) - self.streams = append(self.streams, stream) + self.streams = append(self.streams, newStream(stream)) self.probepkts = append(self.probepkts, tag) default: @@ -708,7 +738,9 @@ func (self *Conn) ReadHeader() (err error) { } func (self *Conn) Streams() (streams []av.CodecData, err error) { - streams = self.streams + for _, stream := range self.streams { + streams = append(streams, stream.codec) + } return } @@ -721,15 +753,17 @@ func tsToTime(ts uint32) time.Duration { } func (self *Conn) WritePacket(pkt av.Packet) (err error) { - ts := timeToTs(pkt.Time) stream := self.streams[pkt.Idx] + ts := timeToTs(pkt.Time) + if self.Debug { fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts) } - switch stream.Type() { + codec := stream.codec + switch codec.Type() { case av.AAC: - audiodata := self.makeAACAudiodata(stream.(av.AudioCodecData), flvio.AAC_RAW, pkt.Data) + audiodata := self.makeAACAudiodata(codec.(av.AudioCodecData), flvio.AAC_RAW, pkt.Data) w := self.writeAudioDataStart() audiodata.Marshal(w) if err = self.writeAudioDataEnd(ts); err != nil { @@ -752,8 +786,6 @@ func (self *Conn) WritePacket(pkt av.Packet) (err error) { func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { metadata := flvio.AMFECMAArray{} - metadata["duration"] = 0 - for _, _stream := range streams { typ := _stream.Type() switch { @@ -770,8 +802,8 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { metadata["width"] = stream.Width() metadata["height"] = stream.Height() - metadata["framerate"] = 24 // TODO: make it correct - metadata["videodatarate"] = 1538 + metadata["displayWidth"] = stream.Width() + metadata["displayHeight"] = stream.Height() case typ.IsAudio(): stream := _stream.(av.AudioCodecData) @@ -784,9 +816,7 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { return } - metadata["audiodatarate"] = 156 - metadata["audiosamplerate"] = 3 - metadata["audiosamplesize"] = 1 + metadata["audiosamplerate"] = stream.SampleRate() } } @@ -810,6 +840,7 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { if err = self.writeVideoDataEnd(0); err != nil { return } + self.streams = append(self.streams, newStream(stream)) case av.AAC: aac := stream.(aacparser.CodecData) @@ -819,10 +850,10 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { if err = self.writeAudioDataEnd(0); err != nil { return } + self.streams = append(self.streams, newStream(stream)) } } - self.streams = streams return }