client can work

This commit is contained in:
nareix 2016-06-26 21:33:25 +08:00
parent 57541c0a59
commit 64d8e0b0e4

127
server.go
View File

@ -42,8 +42,6 @@ func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) {
return
}
fmt.Println("rtmp: connected")
conn = NewConn(netconn)
conn.Host = host
conn.Path = path
@ -138,9 +136,12 @@ type Conn struct {
commandparams []interface{}
gotmsg bool
timestamp uint32
msgdata []byte
msgtypeid uint8
datamsgvals []interface{}
videodata *flvio.Videodata
audiodata *flvio.Audiodata
eventtype uint16
}
@ -212,12 +213,14 @@ func (self *Conn) pollCommand() (err error) {
func (self *Conn) pollMsg() (err error) {
self.gotmsg = false
self.gotcommand = false
self.datamsgvals = nil
self.videodata = nil
self.audiodata = nil
for {
if err = self.readChunk(); err != nil {
return
}
if self.gotmsg {
fmt.Println("rtmp: gotmsg iscommand", self.gotcommand)
return
}
}
@ -317,7 +320,9 @@ func (self *Conn) determineType() (err error) {
flvio.WriteAMF0Val(w, true)
self.writeDataMsgEnd(5, self.avmsgsid)
fmt.Println("rtmp: playing")
if self.Debug {
fmt.Println("rtmp: playing")
}
self.Path = fmt.Sprintf("/%s/%s", connectpath, playpath)
self.playing = true
@ -405,7 +410,9 @@ func (self *Conn) connectPlay() (err error) {
}
// > connect("app")
fmt.Printf("rtmp: > connect('%s') host=%s\n", connectpath, self.Host)
if self.Debug {
fmt.Printf("rtmp: > connect('%s') host=%s\n", connectpath, self.Host)
}
w := self.writeCommandMsgStart()
flvio.WriteAMF0Val(w, "connect")
flvio.WriteAMF0Val(w, 1)
@ -434,7 +441,9 @@ func (self *Conn) connectPlay() (err error) {
err = fmt.Errorf("rtmp: command connect failed: %s", errmsg)
return
}
fmt.Printf("rtmp: < _result() of connect\n")
if self.Debug {
fmt.Printf("rtmp: < _result() of connect\n")
}
break
}
} else {
@ -445,15 +454,17 @@ func (self *Conn) connectPlay() (err error) {
}
// > createStream()
fmt.Printf("rtmp: > createStream()\n")
if self.Debug {
fmt.Printf("rtmp: > createStream()\n")
}
w = self.writeCommandMsgStart()
flvio.WriteAMF0Val(w, "createStream")
flvio.WriteAMF0Val(w, 2)
flvio.WriteAMF0Val(w, nil)
self.writeCommandMsgEnd(3, 0)
// > SetBufferLength 0,3000ms
self.writeSetBufferLength(0, 3000)
// > SetBufferLength 0,0ms
self.writeSetBufferLength(0, 0)
for {
if err = self.pollMsg(); err != nil {
@ -473,7 +484,9 @@ func (self *Conn) connectPlay() (err error) {
}
// > play('app')
fmt.Printf("rtmp: > play('%s')\n", playpath)
if self.Debug {
fmt.Printf("rtmp: > play('%s')\n", playpath)
}
w = self.writeCommandMsgStart()
flvio.WriteAMF0Val(w, "play")
flvio.WriteAMF0Val(w, 0)
@ -494,7 +507,9 @@ func (self *Conn) connectPlay() (err error) {
if atype, vtype, err = self.parseMetaData(data); err != nil {
return
}
fmt.Printf("rtmp: < onMetaData()\n")
if self.Debug {
fmt.Printf("rtmp: < onMetaData()\n")
}
break
}
}
@ -515,12 +530,7 @@ func (self *Conn) connectPlay() (err error) {
switch {
case self.msgtypeid == msgtypeidVideoMsg && vtype != 0:
tag := &flvio.Videodata{}
r := pio.NewReaderBytes(self.msgdata)
r.LimitOn(int64(len(self.msgdata)))
if err = tag.Unmarshal(r); err != nil {
return
}
tag := self.videodata
switch vtype {
case av.H264:
if tag.AVCPacketType == flvio.AVC_SEQHDR {
@ -535,12 +545,7 @@ func (self *Conn) connectPlay() (err error) {
}
case self.msgtypeid == msgtypeidAudioMsg && atype != 0:
tag := &flvio.Audiodata{}
r := pio.NewReaderBytes(self.msgdata)
r.LimitOn(int64(len(self.msgdata)))
if err = tag.Unmarshal(r); err != nil {
return
}
tag := self.audiodata
switch atype {
case av.AAC:
if tag.AACPacketType == flvio.AAC_SEQHDR {
@ -569,6 +574,28 @@ func (self *Conn) connectPlay() (err error) {
}
func (self *Conn) ReadPacket() (pkt av.Packet, err error) {
poll: for {
if err = self.pollMsg(); err != nil {
return
}
switch self.msgtypeid {
case msgtypeidVideoMsg:
tag := self.videodata
pkt.CompositionTime = tsToTime(uint32(tag.CompositionTime))
pkt.Data = tag.Data
pkt.IsKeyFrame = tag.FrameType == flvio.FRAME_KEY
pkt.Idx = int8(self.videostreamidx)
break poll
case msgtypeidAudioMsg:
tag := self.audiodata
pkt.Data = tag.Data
pkt.Idx = int8(self.audiostreamidx)
break poll
}
}
pkt.Time = tsToTime(self.timestamp)
return
}
@ -589,10 +616,20 @@ func (self *Conn) Streams() (streams []av.CodecData, err error) {
return
}
func timeToTs(tm time.Duration) uint32 {
return uint32(tm / time.Millisecond)
}
func tsToTime(ts uint32) time.Duration {
return time.Duration(ts)*time.Millisecond
}
func (self *Conn) WritePacket(pkt av.Packet) (err error) {
ts := uint32(pkt.Time/time.Millisecond)
ts := timeToTs(pkt.Time)
stream := self.streams[pkt.Idx]
fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts)
if self.Debug {
fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime, ts)
}
switch stream.Type() {
case av.AAC:
@ -603,7 +640,7 @@ func (self *Conn) WritePacket(pkt av.Packet) (err error) {
case av.H264:
videodata := self.makeH264Videodata(flvio.AVC_NALU, pkt.IsKeyFrame, pkt.Data)
videodata.CompositionTime = int32(pkt.CompositionTime/time.Millisecond)
videodata.CompositionTime = int32(timeToTs(pkt.CompositionTime))
w := self.writeVideoDataStart()
videodata.Marshal(w)
self.writeVideoDataEnd(ts)
@ -874,7 +911,9 @@ func (self *Conn) writeChunks(csid uint32, timestamp uint32, msgtypeid uint8, ms
}
}
fmt.Printf("rtmp: write chunk msgdatalen=%d msgsid=%d\n", msgdatalen, msgsid)
if self.Debug {
fmt.Printf("rtmp: write chunk msgdatalen=%d msgsid=%d\n", msgdatalen, msgsid)
}
if err = self.bufw.Flush(); err != nil {
return
@ -1061,19 +1100,18 @@ func (self *Conn) readChunk() (err error) {
}
cs.msgdataleft -= uint32(size)
if true {
if self.Debug {
fmt.Printf("rtmp: chunk csid=%d msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d\n",
csid, cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.msgdatalen, cs.msgdataleft)
}
if cs.msgdataleft == 0 {
if true {
if self.Debug {
fmt.Println("rtmp: chunk data")
fmt.Print(hex.Dump(cs.msgdata))
fmt.Printf("%x\n", cs.msgdata)
}
if err = self.handleMsg(cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil {
if err = self.handleMsg(cs.timenow, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil {
return
}
}
@ -1107,8 +1145,9 @@ func (self *Conn) handleCommandMsgAMF0(r *pio.Reader) (err error) {
return
}
func (self *Conn) handleMsg(msgsid uint32, msgtypeid uint8, msgdata []byte) (err error) {
func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, msgdata []byte) (err error) {
self.msgtypeid = msgtypeid
self.timestamp = timestamp
switch msgtypeid {
case msgtypeidCommandMsgAMF0:
@ -1135,7 +1174,6 @@ func (self *Conn) handleMsg(msgsid uint32, msgtypeid uint8, msgdata []byte) (err
case msgtypeidDataMsgAMF0:
r := pio.NewReaderBytes(msgdata)
self.datamsgvals = []interface{}{}
for {
if val, err := flvio.ReadAMF0Val(r); err != nil {
break
@ -1144,8 +1182,23 @@ func (self *Conn) handleMsg(msgsid uint32, msgtypeid uint8, msgdata []byte) (err
}
}
case msgtypeidVideoMsg, msgtypeidAudioMsg:
self.msgdata = msgdata
case msgtypeidVideoMsg:
tag := &flvio.Videodata{}
r := pio.NewReaderBytes(msgdata)
r.LimitOn(int64(len(msgdata)))
if err = tag.Unmarshal(r); err != nil {
return
}
self.videodata = tag
case msgtypeidAudioMsg:
tag := &flvio.Audiodata{}
r := pio.NewReaderBytes(msgdata)
r.LimitOn(int64(len(msgdata)))
if err = tag.Unmarshal(r); err != nil {
return
}
self.audiodata = tag
case msgtypeidSetChunkSize:
self.readMaxChunkSize = int(pio.GetU32BE(msgdata))
@ -1247,7 +1300,9 @@ func (self *Conn) handshakeClient() (err error) {
return
}
fmt.Println("rtmp: handshakeClient: server version", S1[4],S1[5],S1[6],S1[7])
if self.Debug {
fmt.Println("rtmp: handshakeClient: server version", S1[4],S1[5],S1[6],S1[7])
}
if S1[4] >= 3 {
} else {