From 64d8e0b0e42b4b42f8339927942c61bb2e9595a2 Mon Sep 17 00:00:00 2001 From: nareix Date: Sun, 26 Jun 2016 21:33:25 +0800 Subject: [PATCH] client can work --- server.go | 127 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 91 insertions(+), 36 deletions(-) diff --git a/server.go b/server.go index d6410f0..a01ebd3 100644 --- a/server.go +++ b/server.go @@ -42,8 +42,6 @@ func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) { return } - fmt.Println("rtmp: connected") - conn = NewConn(netconn) conn.Host = host conn.Path = path @@ -138,9 +136,12 @@ type Conn struct { commandparams []interface{} gotmsg bool + timestamp uint32 msgdata []byte msgtypeid uint8 datamsgvals []interface{} + videodata *flvio.Videodata + audiodata *flvio.Audiodata eventtype uint16 } @@ -212,12 +213,14 @@ func (self *Conn) pollCommand() (err error) { func (self *Conn) pollMsg() (err error) { self.gotmsg = false self.gotcommand = false + self.datamsgvals = nil + self.videodata = nil + self.audiodata = nil for { if err = self.readChunk(); err != nil { return } if self.gotmsg { - fmt.Println("rtmp: gotmsg iscommand", self.gotcommand) return } } @@ -317,7 +320,9 @@ func (self *Conn) determineType() (err error) { flvio.WriteAMF0Val(w, true) self.writeDataMsgEnd(5, self.avmsgsid) - fmt.Println("rtmp: playing") + if self.Debug { + fmt.Println("rtmp: playing") + } self.Path = fmt.Sprintf("/%s/%s", connectpath, playpath) self.playing = true @@ -405,7 +410,9 @@ func (self *Conn) connectPlay() (err error) { } // > connect("app") - fmt.Printf("rtmp: > connect('%s') host=%s\n", connectpath, self.Host) + if self.Debug { + fmt.Printf("rtmp: > connect('%s') host=%s\n", connectpath, self.Host) + } w := self.writeCommandMsgStart() flvio.WriteAMF0Val(w, "connect") flvio.WriteAMF0Val(w, 1) @@ -434,7 +441,9 @@ func (self *Conn) connectPlay() (err error) { err = fmt.Errorf("rtmp: command connect failed: %s", errmsg) return } - fmt.Printf("rtmp: < _result() of connect\n") + if self.Debug { + fmt.Printf("rtmp: < _result() of connect\n") + } break } } else { @@ -445,15 +454,17 @@ func (self *Conn) connectPlay() (err error) { } // > createStream() - fmt.Printf("rtmp: > createStream()\n") + if self.Debug { + fmt.Printf("rtmp: > createStream()\n") + } w = self.writeCommandMsgStart() flvio.WriteAMF0Val(w, "createStream") flvio.WriteAMF0Val(w, 2) flvio.WriteAMF0Val(w, nil) self.writeCommandMsgEnd(3, 0) - // > SetBufferLength 0,3000ms - self.writeSetBufferLength(0, 3000) + // > SetBufferLength 0,0ms + self.writeSetBufferLength(0, 0) for { if err = self.pollMsg(); err != nil { @@ -473,7 +484,9 @@ func (self *Conn) connectPlay() (err error) { } // > play('app') - fmt.Printf("rtmp: > play('%s')\n", playpath) + if self.Debug { + fmt.Printf("rtmp: > play('%s')\n", playpath) + } w = self.writeCommandMsgStart() flvio.WriteAMF0Val(w, "play") flvio.WriteAMF0Val(w, 0) @@ -494,7 +507,9 @@ func (self *Conn) connectPlay() (err error) { if atype, vtype, err = self.parseMetaData(data); err != nil { return } - fmt.Printf("rtmp: < onMetaData()\n") + if self.Debug { + fmt.Printf("rtmp: < onMetaData()\n") + } break } } @@ -515,12 +530,7 @@ func (self *Conn) connectPlay() (err error) { switch { case self.msgtypeid == msgtypeidVideoMsg && vtype != 0: - tag := &flvio.Videodata{} - r := pio.NewReaderBytes(self.msgdata) - r.LimitOn(int64(len(self.msgdata))) - if err = tag.Unmarshal(r); err != nil { - return - } + tag := self.videodata switch vtype { case av.H264: if tag.AVCPacketType == flvio.AVC_SEQHDR { @@ -535,12 +545,7 @@ func (self *Conn) connectPlay() (err error) { } case self.msgtypeid == msgtypeidAudioMsg && atype != 0: - tag := &flvio.Audiodata{} - r := pio.NewReaderBytes(self.msgdata) - r.LimitOn(int64(len(self.msgdata))) - if err = tag.Unmarshal(r); err != nil { - return - } + tag := self.audiodata switch atype { case av.AAC: if tag.AACPacketType == flvio.AAC_SEQHDR { @@ -569,6 +574,28 @@ func (self *Conn) connectPlay() (err error) { } func (self *Conn) ReadPacket() (pkt av.Packet, err error) { + poll: for { + if err = self.pollMsg(); err != nil { + return + } + switch self.msgtypeid { + case msgtypeidVideoMsg: + tag := self.videodata + pkt.CompositionTime = tsToTime(uint32(tag.CompositionTime)) + pkt.Data = tag.Data + pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY + pkt.Idx = int8(self.videostreamidx) + break poll + + case msgtypeidAudioMsg: + tag := self.audiodata + pkt.Data = tag.Data + pkt.Idx = int8(self.audiostreamidx) + break poll + } + } + + pkt.Time = tsToTime(self.timestamp) return } @@ -589,10 +616,20 @@ func (self *Conn) Streams() (streams []av.CodecData, err error) { return } +func timeToTs(tm time.Duration) uint32 { + return uint32(tm / time.Millisecond) +} + +func tsToTime(ts uint32) time.Duration { + return time.Duration(ts)*time.Millisecond +} + func (self *Conn) WritePacket(pkt av.Packet) (err error) { - ts := uint32(pkt.Time/time.Millisecond) + ts := timeToTs(pkt.Time) stream := self.streams[pkt.Idx] - fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts) + if self.Debug { + fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts) + } switch stream.Type() { case av.AAC: @@ -603,7 +640,7 @@ func (self *Conn) WritePacket(pkt av.Packet) (err error) { case av.H264: videodata := self.makeH264Videodata(flvio.AVC_NALU, pkt.IsKeyFrame, pkt.Data) - videodata.CompositionTime = int32(pkt.CompositionTime/time.Millisecond) + videodata.CompositionTime = int32(timeToTs(pkt.CompositionTime)) w := self.writeVideoDataStart() videodata.Marshal(w) self.writeVideoDataEnd(ts) @@ -874,7 +911,9 @@ func (self *Conn) writeChunks(csid uint32, timestamp uint32, msgtypeid uint8, ms } } - fmt.Printf("rtmp: write chunk msgdatalen=%d msgsid=%d\n", msgdatalen, msgsid) + if self.Debug { + fmt.Printf("rtmp: write chunk msgdatalen=%d msgsid=%d\n", msgdatalen, msgsid) + } if err = self.bufw.Flush(); err != nil { return @@ -1061,19 +1100,18 @@ func (self *Conn) readChunk() (err error) { } cs.msgdataleft -= uint32(size) - if true { + if self.Debug { fmt.Printf("rtmp: chunk csid=%d msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d\n", csid, cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.msgdatalen, cs.msgdataleft) } if cs.msgdataleft == 0 { - if true { + if self.Debug { fmt.Println("rtmp: chunk data") fmt.Print(hex.Dump(cs.msgdata)) - fmt.Printf("%x\n", cs.msgdata) } - if err = self.handleMsg(cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil { + if err = self.handleMsg(cs.timenow, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil { return } } @@ -1107,8 +1145,9 @@ func (self *Conn) handleCommandMsgAMF0(r *pio.Reader) (err error) { return } -func (self *Conn) handleMsg(msgsid uint32, msgtypeid uint8, msgdata []byte) (err error) { +func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, msgdata []byte) (err error) { self.msgtypeid = msgtypeid + self.timestamp = timestamp switch msgtypeid { case msgtypeidCommandMsgAMF0: @@ -1135,7 +1174,6 @@ func (self *Conn) handleMsg(msgsid uint32, msgtypeid uint8, msgdata []byte) (err case msgtypeidDataMsgAMF0: r := pio.NewReaderBytes(msgdata) - self.datamsgvals = []interface{}{} for { if val, err := flvio.ReadAMF0Val(r); err != nil { break @@ -1144,8 +1182,23 @@ func (self *Conn) handleMsg(msgsid uint32, msgtypeid uint8, msgdata []byte) (err } } - case msgtypeidVideoMsg, msgtypeidAudioMsg: - self.msgdata = msgdata + case msgtypeidVideoMsg: + tag := &flvio.Videodata{} + r := pio.NewReaderBytes(msgdata) + r.LimitOn(int64(len(msgdata))) + if err = tag.Unmarshal(r); err != nil { + return + } + self.videodata = tag + + case msgtypeidAudioMsg: + tag := &flvio.Audiodata{} + r := pio.NewReaderBytes(msgdata) + r.LimitOn(int64(len(msgdata))) + if err = tag.Unmarshal(r); err != nil { + return + } + self.audiodata = tag case msgtypeidSetChunkSize: self.readMaxChunkSize = int(pio.GetU32BE(msgdata)) @@ -1247,7 +1300,9 @@ func (self *Conn) handshakeClient() (err error) { return } - fmt.Println("rtmp: handshakeClient: server version", S1[4],S1[5],S1[6],S1[7]) + if self.Debug { + fmt.Println("rtmp: handshakeClient: server version", S1[4],S1[5],S1[6],S1[7]) + } if S1[4] >= 3 { } else {