diff --git a/format/rtmp/rtmp.go b/format/rtmp/rtmp.go index 6a0e1f1..961b214 100644 --- a/format/rtmp/rtmp.go +++ b/format/rtmp/rtmp.go @@ -69,21 +69,21 @@ type Server struct { doneChan chan struct{} } -func (self *Server) handleConn(conn *Conn) (err error) { - if self.HandleConn != nil { - self.HandleConn(conn) +func (s *Server) handleConn(conn *Conn) (err error) { + if s.HandleConn != nil { + s.HandleConn(conn) } else { if err = conn.prepare(stageCommandDone, 0); err != nil { return } if conn.playing { - if self.HandlePlay != nil { - self.HandlePlay(conn) + if s.HandlePlay != nil { + s.HandlePlay(conn) } } else if conn.publishing { - if self.HandlePublish != nil { - self.HandlePublish(conn) + if s.HandlePublish != nil { + s.HandlePublish(conn) } } } @@ -91,8 +91,8 @@ func (self *Server) handleConn(conn *Conn) (err error) { return } -func (self *Server) ListenAndServe() error { - addr := self.Addr +func (s *Server) ListenAndServe() error { + addr := s.Addr if addr == "" { addr = ":1935" } @@ -102,11 +102,11 @@ func (self *Server) ListenAndServe() error { return err } - return self.Serve(listener) + return s.Serve(listener) } -func (self *Server) ListenAndServeTLS(certFile, keyFile string) error { - addr := self.Addr +func (s *Server) ListenAndServeTLS(certFile, keyFile string) error { + addr := s.Addr if addr == "" { addr = ":1935" } @@ -116,14 +116,14 @@ func (self *Server) ListenAndServeTLS(certFile, keyFile string) error { return err } - return self.ServeTLS(listener, certFile, keyFile) + return s.ServeTLS(listener, certFile, keyFile) } -func (self *Server) ServeTLS(listener net.Listener, certFile, keyFile string) error { +func (s *Server) ServeTLS(listener net.Listener, certFile, keyFile string) error { var config *tls.Config - if self.TLSConfig != nil { - config = self.TLSConfig.Clone() + if s.TLSConfig != nil { + config = s.TLSConfig.Clone() } else { config = &tls.Config{} } @@ -142,24 +142,24 @@ func (self *Server) ServeTLS(listener net.Listener, certFile, keyFile string) er listener = tls.NewListener(listener, config) - return self.Serve(listener) + return s.Serve(listener) } -func (self *Server) Serve(listener net.Listener) error { - self.doneChan = make(chan struct{}) +func (s *Server) Serve(listener net.Listener) error { + s.doneChan = make(chan struct{}) - self.listener = listener - defer self.listener.Close() + s.listener = listener + defer s.listener.Close() if Debug { - fmt.Println("rtmp: server: listening on", self.listener.Addr().String()) + fmt.Println("rtmp: server: listening on", s.listener.Addr().String()) } for { - netconn, err := self.listener.Accept() + netconn, err := s.listener.Accept() if err != nil { select { - case <-self.doneChan: + case <-s.doneChan: return ErrServerClosed default: } @@ -174,7 +174,7 @@ func (self *Server) Serve(listener net.Listener) error { conn := NewConn(netconn) conn.isserver = true go func() { - err := self.handleConn(conn) + err := s.handleConn(conn) if Debug { fmt.Println("rtmp: server: client closed err:", err) } @@ -183,15 +183,15 @@ func (self *Server) Serve(listener net.Listener) error { } } -func (self *Server) Close() { - if self.listener == nil { +func (s *Server) Close() { + if s.listener == nil { return } - close(self.doneChan) + close(s.doneChan) - self.listener.Close() - self.listener = nil + s.listener.Close() + s.listener = nil } const ( @@ -212,9 +212,6 @@ type Conn struct { prober *flv.Prober streams []av.CodecData - txbytes uint64 - rxbytes uint64 - bufr *bufio.Reader bufw *bufio.Writer ackn uint32 @@ -261,15 +258,15 @@ type txrxcount struct { rxbytes uint64 } -func (self *txrxcount) Read(p []byte) (int, error) { - n, err := self.ReadWriter.Read(p) - self.rxbytes += uint64(n) +func (t *txrxcount) Read(p []byte) (int, error) { + n, err := t.ReadWriter.Read(p) + t.rxbytes += uint64(n) return n, err } -func (self *txrxcount) Write(p []byte) (int, error) { - n, err := self.ReadWriter.Write(p) - self.txbytes += uint64(n) +func (t *txrxcount) Write(p []byte) (int, error) { + n, err := t.ReadWriter.Write(p) + t.txbytes += uint64(n) return n, err } @@ -300,9 +297,9 @@ type chunkStream struct { msgdata []byte } -func (self *chunkStream) Start() { - self.msgdataleft = self.msgdatalen - self.msgdata = make([]byte, self.msgdatalen) +func (cs *chunkStream) Start() { + cs.msgdataleft = cs.msgdatalen + cs.msgdata = make([]byte, cs.msgdatalen) } const ( @@ -327,56 +324,56 @@ const ( eventtypePingResponse = 7 ) -func (self *Conn) NetConn() net.Conn { - return self.netconn +func (conn *Conn) NetConn() net.Conn { + return conn.netconn } -func (self *Conn) TxBytes() uint64 { - return self.txrxcount.txbytes +func (conn *Conn) TxBytes() uint64 { + return conn.txrxcount.txbytes } -func (self *Conn) RxBytes() uint64 { - return self.txrxcount.rxbytes +func (conn *Conn) RxBytes() uint64 { + return conn.txrxcount.rxbytes } -func (self *Conn) Close() (err error) { - return self.netconn.Close() +func (conn *Conn) Close() (err error) { + return conn.netconn.Close() } -func (self *Conn) pollCommand() (err error) { +func (conn *Conn) pollCommand() (err error) { for { - if err = self.pollMsg(); err != nil { + if err = conn.pollMsg(); err != nil { return } - if self.gotcommand { + if conn.gotcommand { return } } } -func (self *Conn) pollAVTag() (tag flvio.Tag, err error) { +func (conn *Conn) pollAVTag() (tag flvio.Tag, err error) { for { - if err = self.pollMsg(); err != nil { + if err = conn.pollMsg(); err != nil { return } - switch self.msgtypeid { + switch conn.msgtypeid { case msgtypeidVideoMsg, msgtypeidAudioMsg: - tag = self.avtag + tag = conn.avtag return } } } -func (self *Conn) pollMsg() (err error) { - self.gotmsg = false - self.gotcommand = false - self.datamsgvals = nil - self.avtag = flvio.Tag{} +func (conn *Conn) pollMsg() (err error) { + conn.gotmsg = false + conn.gotcommand = false + conn.datamsgvals = nil + conn.avtag = flvio.Tag{} for { - if err = self.readChunk(); err != nil { + if err = conn.readChunk(); err != nil { return } - if self.gotmsg { + if conn.gotmsg { return } } @@ -431,34 +428,34 @@ func createURL(tcurl, app, play string) (*url.URL, error) { var CodecTypes = flv.CodecTypes -func (self *Conn) writeBasicConf() (err error) { +func (conn *Conn) writeBasicConf() (err error) { // > SetChunkSize - if err = self.writeSetChunkSize(1024 * 1024 * 1); err != nil { + if err = conn.writeSetChunkSize(1024 * 1024 * 1); err != nil { return } // > WindowAckSize - if err = self.writeWindowAckSize(1024 * 1024 * 3); err != nil { + if err = conn.writeWindowAckSize(1024 * 1024 * 3); err != nil { return } // > SetPeerBandwidth - if err = self.writeSetPeerBandwidth(1024*1024*3, 0); err != nil { + if err = conn.writeSetPeerBandwidth(1024*1024*3, 0); err != nil { return } return } -func (self *Conn) readConnect() (err error) { +func (conn *Conn) readConnect() (err error) { var connectpath string // < connect("app") - if err = self.pollCommand(); err != nil { + if err = conn.pollCommand(); err != nil { return } - if self.commandname != "connect" { + if conn.commandname != "connect" { err = fmt.Errorf("first command is not connect") return } - if self.commandobj == nil { + if conn.commandobj == nil { err = fmt.Errorf("connect command params invalid") return } @@ -467,28 +464,28 @@ func (self *Conn) readConnect() (err error) { var ok bool var _app, _tcurl interface{} - if _app, ok = self.commandobj["app"]; !ok { + if _app, ok = conn.commandobj["app"]; !ok { err = fmt.Errorf("the `connect` params missing `app`") return } connectpath, _ = _app.(string) var tcurl string - if _tcurl, ok = self.commandobj["tcUrl"]; !ok { - _tcurl, ok = self.commandobj["tcurl"] + if _tcurl, ok = conn.commandobj["tcUrl"]; !ok { + _tcurl, ok = conn.commandobj["tcurl"] } if ok { tcurl, _ = _tcurl.(string) } - connectparams := self.commandobj + connectparams := conn.commandobj - if err = self.writeBasicConf(); err != nil { + if err = conn.writeBasicConf(); err != nil { return } // > _result("NetConnection.Connect.Success") - if err = self.writeCommandMsg(3, 0, "_result", self.commandtransid, + if err = conn.writeCommandMsg(3, 0, "_result", conn.commandtransid, flvio.AMFMap{ "fmtVer": "FMS/3,0,1,123", "capabilities": 31, @@ -503,25 +500,25 @@ func (self *Conn) readConnect() (err error) { return } - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } for { - if err = self.pollMsg(); err != nil { + if err = conn.pollMsg(); err != nil { return } - if self.gotcommand { - switch self.commandname { + if conn.gotcommand { + switch conn.commandname { // < createStream case "createStream": - self.avmsgsid = uint32(1) + conn.avmsgsid = uint32(1) // > _result(streamid) - if err = self.writeCommandMsg(3, 0, "_result", self.commandtransid, nil, self.avmsgsid); err != nil { + if err = conn.writeCommandMsg(3, 0, "_result", conn.commandtransid, nil, conn.avmsgsid); err != nil { return } - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } @@ -531,20 +528,20 @@ func (self *Conn) readConnect() (err error) { fmt.Println("rtmp: < publish") } - if len(self.commandparams) < 1 { + if len(conn.commandparams) < 1 { err = fmt.Errorf("publish params invalid") return } - publishpath, _ := self.commandparams[0].(string) + publishpath, _ := conn.commandparams[0].(string) var cberr error - if self.OnPlayOrPublish != nil { - cberr = self.OnPlayOrPublish(self.commandname, connectparams) + if conn.OnPlayOrPublish != nil { + cberr = conn.OnPlayOrPublish(conn.commandname, connectparams) } // > onStatus() - if err = self.writeCommandMsg(5, self.avmsgsid, - "onStatus", self.commandtransid, nil, + if err = conn.writeCommandMsg(5, conn.avmsgsid, + "onStatus", conn.commandtransid, nil, flvio.AMFMap{ "level": "status", "code": "NetStream.Publish.Start", @@ -553,7 +550,7 @@ func (self *Conn) readConnect() (err error) { ); err != nil { return } - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } @@ -568,10 +565,10 @@ func (self *Conn) readConnect() (err error) { return } - self.URL = u - self.publishing = true - self.reading = true - self.stage++ + conn.URL = u + conn.publishing = true + conn.reading = true + conn.stage++ return // < play("path") @@ -580,20 +577,20 @@ func (self *Conn) readConnect() (err error) { fmt.Println("rtmp: < play") } - if len(self.commandparams) < 1 { + if len(conn.commandparams) < 1 { err = fmt.Errorf("command play params invalid") return } - playpath, _ := self.commandparams[0].(string) + playpath, _ := conn.commandparams[0].(string) // > streamBegin(streamid) - if err = self.writeStreamBegin(self.avmsgsid); err != nil { + if err = conn.writeStreamBegin(conn.avmsgsid); err != nil { return } // > onStatus() - if err = self.writeCommandMsg(5, self.avmsgsid, - "onStatus", self.commandtransid, nil, + if err = conn.writeCommandMsg(5, conn.avmsgsid, + "onStatus", conn.commandtransid, nil, flvio.AMFMap{ "level": "status", "code": "NetStream.Play.Start", @@ -610,7 +607,7 @@ func (self *Conn) readConnect() (err error) { // return //} - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } @@ -620,10 +617,10 @@ func (self *Conn) readConnect() (err error) { return } - self.URL = u - self.playing = true - self.writing = true - self.stage++ + conn.URL = u + conn.playing = true + conn.writing = true + conn.stage++ return } @@ -631,19 +628,19 @@ func (self *Conn) readConnect() (err error) { } } -func (self *Conn) checkConnectResult() (ok bool, errmsg string) { - if len(self.commandparams) < 1 { +func (conn *Conn) checkConnectResult() (ok bool, errmsg string) { + if len(conn.commandparams) < 1 { errmsg = "params length < 1" return } - obj, _ := self.commandparams[0].(flvio.AMFMap) + obj, _ := conn.commandparams[0].(flvio.AMFMap) if obj == nil { errmsg = "params[0] not object" return } - _code, _ := obj["code"] + _code := obj["code"] if _code == nil { errmsg = "code invalid" return @@ -659,37 +656,37 @@ func (self *Conn) checkConnectResult() (ok bool, errmsg string) { return } -func (self *Conn) checkCreateStreamResult() (ok bool, avmsgsid uint32) { - if len(self.commandparams) < 1 { +func (conn *Conn) checkCreateStreamResult() (ok bool, avmsgsid uint32) { + if len(conn.commandparams) < 1 { return } ok = true - _avmsgsid, _ := self.commandparams[0].(float64) + _avmsgsid, _ := conn.commandparams[0].(float64) avmsgsid = uint32(_avmsgsid) return } -func (self *Conn) probe() (err error) { - for !self.prober.Probed() { +func (conn *Conn) probe() (err error) { + for !conn.prober.Probed() { var tag flvio.Tag - if tag, err = self.pollAVTag(); err != nil { + if tag, err = conn.pollAVTag(); err != nil { return } - if err = self.prober.PushTag(tag, int32(self.timestamp)); err != nil { + if err = conn.prober.PushTag(tag, int32(conn.timestamp)); err != nil { if Debug { fmt.Printf("rtmp: error probing tag: %s\n", err.Error()) } } } - self.streams = self.prober.Streams - self.stage++ + conn.streams = conn.prober.Streams + conn.stage++ return } -func (self *Conn) writeConnect(path string) (err error) { - if err = self.writeBasicConf(); err != nil { +func (conn *Conn) writeConnect(path string) (err error) { + if err = conn.writeBasicConf(); err != nil { return } @@ -697,13 +694,13 @@ func (self *Conn) writeConnect(path string) (err error) { // > connect("app") if Debug { - fmt.Printf("rtmp: > connect('%s') host=%s\n", path, self.URL.Host) + fmt.Printf("rtmp: > connect('%s') host=%s\n", path, conn.URL.Host) } - if err = self.writeCommandMsg(3, 0, "connect", 1, + if err = conn.writeCommandMsg(3, 0, "connect", 1, flvio.AMFMap{ "app": path, "flashVer": "MAC 22,0,0,192", - "tcUrl": getTcUrl(self.URL), + "tcUrl": getTcUrl(conn.URL), "fpad": false, "capabilities": 15, "audioCodecs": 4071, @@ -715,20 +712,20 @@ func (self *Conn) writeConnect(path string) (err error) { return } - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } for { - if err = self.pollMsg(); err != nil { + if err = conn.pollMsg(); err != nil { return } - if self.gotcommand { + if conn.gotcommand { // < _result("NetConnection.Connect.Success") - if self.commandname == "_result" { + if conn.commandname == "_result" { var ok bool var errmsg string - if ok, errmsg = self.checkConnectResult(); !ok { + if ok, errmsg = conn.checkConnectResult(); !ok { err = fmt.Errorf("command connect failed: %s", errmsg) return } @@ -738,9 +735,9 @@ func (self *Conn) writeConnect(path string) (err error) { break } } else { - if self.msgtypeid == msgtypeidWindowAckSize { - if len(self.msgdata) == 4 { - self.readAckSize = pio.U32BE(self.msgdata) + if conn.msgtypeid == msgtypeidWindowAckSize { + if len(conn.msgdata) == 4 { + conn.readAckSize = pio.U32BE(conn.msgdata) } //if err = self.writeWindowAckSize(0xffffffff); err != nil { // return @@ -752,10 +749,10 @@ func (self *Conn) writeConnect(path string) (err error) { return } -func (self *Conn) connectPublish() (err error) { - connectpath, publishpath := SplitPath(self.URL) +func (conn *Conn) connectPublish() (err error) { + connectpath, publishpath := SplitPath(conn.URL) - if err = self.writeConnect(connectpath); err != nil { + if err = conn.writeConnect(connectpath); err != nil { return } @@ -765,24 +762,24 @@ func (self *Conn) connectPublish() (err error) { if Debug { fmt.Printf("rtmp: > createStream()\n") } - if err = self.writeCommandMsg(3, 0, "createStream", transid, nil); err != nil { + if err = conn.writeCommandMsg(3, 0, "createStream", transid, nil); err != nil { return } transid++ - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } for { - if err = self.pollMsg(); err != nil { + if err = conn.pollMsg(); err != nil { return } - if self.gotcommand { + if conn.gotcommand { // < _result(avmsgsid) of createStream - if self.commandname == "_result" { + if conn.commandname == "_result" { var ok bool - if ok, self.avmsgsid = self.checkCreateStreamResult(); !ok { + if ok, conn.avmsgsid = conn.checkCreateStreamResult(); !ok { err = fmt.Errorf("createStream command failed") return } @@ -795,25 +792,25 @@ func (self *Conn) connectPublish() (err error) { if Debug { fmt.Printf("rtmp: > publish('%s')\n", publishpath) } - if err = self.writeCommandMsg(8, self.avmsgsid, "publish", transid, nil, publishpath); err != nil { + if err = conn.writeCommandMsg(8, conn.avmsgsid, "publish", transid, nil, publishpath); err != nil { return } transid++ - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } - self.writing = true - self.publishing = true - self.stage++ + conn.writing = true + conn.publishing = true + conn.stage++ return } -func (self *Conn) connectPlay() (err error) { - connectpath, playpath := SplitPath(self.URL) +func (conn *Conn) connectPlay() (err error) { + connectpath, playpath := SplitPath(conn.URL) - if err = self.writeConnect(connectpath); err != nil { + if err = conn.writeConnect(connectpath); err != nil { return } @@ -821,28 +818,28 @@ func (self *Conn) connectPlay() (err error) { if Debug { fmt.Printf("rtmp: > createStream()\n") } - if err = self.writeCommandMsg(3, 0, "createStream", 2, nil); err != nil { + if err = conn.writeCommandMsg(3, 0, "createStream", 2, nil); err != nil { return } // > SetBufferLength 0,100ms - if err = self.writeSetBufferLength(0, 100); err != nil { + if err = conn.writeSetBufferLength(0, 100); err != nil { return } - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } for { - if err = self.pollMsg(); err != nil { + if err = conn.pollMsg(); err != nil { return } - if self.gotcommand { + if conn.gotcommand { // < _result(avmsgsid) of createStream - if self.commandname == "_result" { + if conn.commandname == "_result" { var ok bool - if ok, self.avmsgsid = self.checkCreateStreamResult(); !ok { + if ok, conn.avmsgsid = conn.checkCreateStreamResult(); !ok { err = fmt.Errorf("createStream command failed") return } @@ -855,72 +852,72 @@ func (self *Conn) connectPlay() (err error) { if Debug { fmt.Printf("rtmp: > play('%s')\n", playpath) } - if err = self.writeCommandMsg(8, self.avmsgsid, "play", 0, nil, playpath); err != nil { + if err = conn.writeCommandMsg(8, conn.avmsgsid, "play", 0, nil, playpath); err != nil { return } - if err = self.flushWrite(); err != nil { + if err = conn.flushWrite(); err != nil { return } - self.reading = true - self.playing = true - self.stage++ + conn.reading = true + conn.playing = true + conn.stage++ return } -func (self *Conn) ReadPacket() (pkt av.Packet, err error) { - if err = self.prepare(stageCodecDataDone, prepareReading); err != nil { +func (conn *Conn) ReadPacket() (pkt av.Packet, err error) { + if err = conn.prepare(stageCodecDataDone, prepareReading); err != nil { return } - if !self.prober.Empty() { - pkt = self.prober.PopPacket() + if !conn.prober.Empty() { + pkt = conn.prober.PopPacket() return } for { var tag flvio.Tag - if tag, err = self.pollAVTag(); err != nil { + if tag, err = conn.pollAVTag(); err != nil { return } var ok bool - if pkt, ok = self.prober.TagToPacket(tag, int32(self.timestamp)); ok { + if pkt, ok = conn.prober.TagToPacket(tag, int32(conn.timestamp)); ok { return pkt, nil } } } -func (self *Conn) Prepare() (err error) { - return self.prepare(stageCommandDone, 0) +func (conn *Conn) Prepare() (err error) { + return conn.prepare(stageCommandDone, 0) } -func (self *Conn) prepare(stage int, flags int) (err error) { - for self.stage < stage { - switch self.stage { +func (conn *Conn) prepare(stage int, flags int) (err error) { + for conn.stage < stage { + switch conn.stage { case 0: - if self.isserver { - if err = self.handshakeServer(); err != nil { + if conn.isserver { + if err = conn.handshakeServer(); err != nil { return } } else { - if err = self.handshakeClient(); err != nil { + if err = conn.handshakeClient(); err != nil { return } } case stageHandshakeDone: - if self.isserver { - if err = self.readConnect(); err != nil { + if conn.isserver { + if err = conn.readConnect(); err != nil { return } } else { if flags == prepareReading { - if err = self.connectPlay(); err != nil { + if err = conn.connectPlay(); err != nil { return } } else { - if err = self.connectPublish(); err != nil { + if err = conn.connectPublish(); err != nil { return } } @@ -928,7 +925,7 @@ func (self *Conn) prepare(stage int, flags int) (err error) { case stageCommandDone: if flags == prepareReading { - if err = self.probe(); err != nil { + if err = conn.probe(); err != nil { return } } else { @@ -940,50 +937,50 @@ func (self *Conn) prepare(stage int, flags int) (err error) { return } -func (self *Conn) Streams() (streams []av.CodecData, err error) { - if err = self.prepare(stageCodecDataDone, prepareReading); err != nil { +func (conn *Conn) Streams() (streams []av.CodecData, err error) { + if err = conn.prepare(stageCodecDataDone, prepareReading); err != nil { return } - streams = self.streams + streams = conn.streams return } -func (self *Conn) WritePacket(pkt av.Packet) (err error) { - if err = self.prepare(stageCodecDataDone, prepareWriting); err != nil { +func (conn *Conn) WritePacket(pkt av.Packet) (err error) { + if err = conn.prepare(stageCodecDataDone, prepareWriting); err != nil { return } - stream := self.streams[pkt.Idx] + stream := conn.streams[pkt.Idx] tag, timestamp := flv.PacketToTag(pkt, stream) if Debug { fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime) } - if err = self.writeAVTag(tag, int32(timestamp)); err != nil { + if err = conn.writeAVTag(tag, int32(timestamp)); err != nil { return } return } -func (self *Conn) WriteTrailer() (err error) { - if err = self.flushWrite(); err != nil { +func (conn *Conn) WriteTrailer() (err error) { + if err = conn.flushWrite(); err != nil { return } return } -func (self *Conn) SetMetaData(data flvio.AMFMap) { - self.metadata = data +func (conn *Conn) SetMetaData(data flvio.AMFMap) { + conn.metadata = data } -func (self *Conn) GetMetaData() flvio.AMFMap { - return self.metadata +func (conn *Conn) GetMetaData() flvio.AMFMap { + return conn.metadata } -func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { - if err = self.prepare(stageCommandDone, prepareWriting); err != nil { +func (conn *Conn) WriteHeader(streams []av.CodecData) (err error) { + if err = conn.prepare(stageCommandDone, prepareWriting); err != nil { return } @@ -998,7 +995,7 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { } // > onMetaData() - if err = self.writeDataMsg(5, self.avmsgsid, "onMetaData", metadata); err != nil { + if err = conn.writeDataMsg(5, conn.avmsgsid, "onMetaData", metadata); err != nil { return } @@ -1011,88 +1008,88 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) { return } if ok { - if err = self.writeAVTag(tag, 0); err != nil { + if err = conn.writeAVTag(tag, 0); err != nil { return } } } - self.streams = streams - self.stage++ + conn.streams = streams + conn.stage++ return } -func (self *Conn) tmpwbuf(n int) []byte { - if len(self.writebuf) < n { - self.writebuf = make([]byte, n) +func (conn *Conn) tmpwbuf(n int) []byte { + if len(conn.writebuf) < n { + conn.writebuf = make([]byte, n) } - return self.writebuf + return conn.writebuf } -func (self *Conn) writeSetChunkSize(size int) (err error) { - self.writeMaxChunkSize = size - b := self.tmpwbuf(chunkHeaderLength + 4) - n := self.fillChunkHeader(b, 2, 0, msgtypeidSetChunkSize, 0, 4) +func (conn *Conn) writeSetChunkSize(size int) (err error) { + conn.writeMaxChunkSize = size + b := conn.tmpwbuf(chunkHeaderLength + 4) + n := conn.fillChunkHeader(b, 2, 0, msgtypeidSetChunkSize, 0, 4) pio.PutU32BE(b[n:], uint32(size)) n += 4 - _, err = self.bufw.Write(b[:n]) + _, err = conn.bufw.Write(b[:n]) return } -func (self *Conn) writeAck(seqnum uint32) (err error) { - b := self.tmpwbuf(chunkHeaderLength + 4) - n := self.fillChunkHeader(b, 2, 0, msgtypeidAck, 0, 4) +func (conn *Conn) writeAck(seqnum uint32) (err error) { + b := conn.tmpwbuf(chunkHeaderLength + 4) + n := conn.fillChunkHeader(b, 2, 0, msgtypeidAck, 0, 4) pio.PutU32BE(b[n:], seqnum) n += 4 - _, err = self.bufw.Write(b[:n]) + _, err = conn.bufw.Write(b[:n]) return } -func (self *Conn) writeWindowAckSize(size uint32) (err error) { - b := self.tmpwbuf(chunkHeaderLength + 4) - n := self.fillChunkHeader(b, 2, 0, msgtypeidWindowAckSize, 0, 4) +func (conn *Conn) writeWindowAckSize(size uint32) (err error) { + b := conn.tmpwbuf(chunkHeaderLength + 4) + n := conn.fillChunkHeader(b, 2, 0, msgtypeidWindowAckSize, 0, 4) pio.PutU32BE(b[n:], size) n += 4 - _, err = self.bufw.Write(b[:n]) + _, err = conn.bufw.Write(b[:n]) return } -func (self *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err error) { - b := self.tmpwbuf(chunkHeaderLength + 5) - n := self.fillChunkHeader(b, 2, 0, msgtypeidSetPeerBandwidth, 0, 5) +func (conn *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err error) { + b := conn.tmpwbuf(chunkHeaderLength + 5) + n := conn.fillChunkHeader(b, 2, 0, msgtypeidSetPeerBandwidth, 0, 5) pio.PutU32BE(b[n:], acksize) n += 4 b[n] = limittype n++ - _, err = self.bufw.Write(b[:n]) + _, err = conn.bufw.Write(b[:n]) return } -func (self *Conn) writeCommandMsg(csid, msgsid uint32, args ...interface{}) (err error) { - return self.writeAMF0Msg(msgtypeidCommandMsgAMF0, csid, msgsid, args...) +func (conn *Conn) writeCommandMsg(csid, msgsid uint32, args ...interface{}) (err error) { + return conn.writeAMF0Msg(msgtypeidCommandMsgAMF0, csid, msgsid, args...) } -func (self *Conn) writeDataMsg(csid, msgsid uint32, args ...interface{}) (err error) { - return self.writeAMF0Msg(msgtypeidDataMsgAMF0, csid, msgsid, args...) +func (conn *Conn) writeDataMsg(csid, msgsid uint32, args ...interface{}) (err error) { + return conn.writeAMF0Msg(msgtypeidDataMsgAMF0, csid, msgsid, args...) } -func (self *Conn) writeAMF0Msg(msgtypeid uint8, csid, msgsid uint32, args ...interface{}) (err error) { +func (conn *Conn) writeAMF0Msg(msgtypeid uint8, csid, msgsid uint32, args ...interface{}) (err error) { size := 0 for _, arg := range args { size += flvio.LenAMF0Val(arg) } - b := self.tmpwbuf(chunkHeaderLength + size) - n := self.fillChunkHeader(b, csid, 0, msgtypeid, msgsid, size) + b := conn.tmpwbuf(chunkHeaderLength + size) + n := conn.fillChunkHeader(b, csid, 0, msgtypeid, msgsid, size) for _, arg := range args { n += flvio.FillAMF0Val(b[n:], arg) } - _, err = self.bufw.Write(b[:n]) + _, err = conn.bufw.Write(b[:n]) return } -func (self *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) { +func (conn *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) { var msgtypeid uint8 var csid uint32 var data []byte @@ -1114,64 +1111,69 @@ func (self *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) { actualChunkHeaderLength += 4 } - b := self.tmpwbuf(actualChunkHeaderLength + flvio.MaxTagSubHeaderLength) + b := conn.tmpwbuf(actualChunkHeaderLength + flvio.MaxTagSubHeaderLength) hdrlen := tag.FillHeader(b[actualChunkHeaderLength:]) - self.fillChunkHeader(b, csid, ts, msgtypeid, self.avmsgsid, hdrlen+len(data)) + conn.fillChunkHeader(b, csid, ts, msgtypeid, conn.avmsgsid, hdrlen+len(data)) n := hdrlen + actualChunkHeaderLength - if n+len(data) > self.writeMaxChunkSize { - if err = self.writeSetChunkSize(n + len(data)); err != nil { + if n+len(data) > conn.writeMaxChunkSize { + if err = conn.writeSetChunkSize(n + len(data)); err != nil { return } } - if _, err = self.bufw.Write(b[:n]); err != nil { + if _, err = conn.bufw.Write(b[:n]); err != nil { return } - _, err = self.bufw.Write(data) - err = self.bufw.Flush() + + if _, err = conn.bufw.Write(data); err != nil { + return + } + + err = conn.bufw.Flush() + return } -func (self *Conn) writeStreamBegin(msgsid uint32) (err error) { - b := self.tmpwbuf(chunkHeaderLength + 6) - n := self.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6) +func (conn *Conn) writeStreamBegin(msgsid uint32) (err error) { + b := conn.tmpwbuf(chunkHeaderLength + 6) + n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6) pio.PutU16BE(b[n:], eventtypeStreamBegin) n += 2 pio.PutU32BE(b[n:], msgsid) n += 4 - _, err = self.bufw.Write(b[:n]) + _, err = conn.bufw.Write(b[:n]) return } -func (self *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err error) { - b := self.tmpwbuf(chunkHeaderLength + 10) - n := self.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 10) +func (conn *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err error) { + b := conn.tmpwbuf(chunkHeaderLength + 10) + n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 10) pio.PutU16BE(b[n:], eventtypeSetBufferLength) n += 2 pio.PutU32BE(b[n:], msgsid) n += 4 pio.PutU32BE(b[n:], timestamp) n += 4 - _, err = self.bufw.Write(b[:n]) + _, err = conn.bufw.Write(b[:n]) return } -func (self *Conn) writePingResponse(timestamp uint32) (err error) { - b := self.tmpwbuf(chunkHeaderLength + 10) - n := self.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6) +func (conn *Conn) writePingResponse(timestamp uint32) (err error) { + b := conn.tmpwbuf(chunkHeaderLength + 10) + n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6) pio.PutU16BE(b[n:], eventtypePingResponse) n += 2 pio.PutU32BE(b[n:], timestamp) n += 4 - _, err = self.bufw.Write(b[:n]) + _, err = conn.bufw.Write(b[:n]) return } const chunkHeaderLength = 12 const FlvTimestampMax = 0xFFFFFF -func (self *Conn) fillChunkHeader(b []byte, csid uint32, timestamp int32, msgtypeid uint8, msgsid uint32, msgdatalen int) (n int) { +func (conn *Conn) fillChunkHeader(b []byte, csid uint32, timestamp int32, msgtypeid uint8, msgsid uint32, msgdatalen int) (n int) { // 0 1 2 3 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -1211,17 +1213,17 @@ func (self *Conn) fillChunkHeader(b []byte, csid uint32, timestamp int32, msgtyp return } -func (self *Conn) flushWrite() (err error) { - if err = self.bufw.Flush(); err != nil { +func (conn *Conn) flushWrite() (err error) { + if err = conn.bufw.Flush(); err != nil { return } return } -func (self *Conn) readChunk() (err error) { - b := self.readbuf +func (conn *Conn) readChunk() (err error) { + b := conn.readbuf n := 0 - if _, err = io.ReadFull(self.bufr, b[:1]); err != nil { + if _, err = io.ReadFull(conn.bufr, b[:1]); err != nil { return } header := b[0] @@ -1236,23 +1238,23 @@ func (self *Conn) readChunk() (err error) { switch csid { default: // Chunk basic header 1 case 0: // Chunk basic header 2 - if _, err = io.ReadFull(self.bufr, b[:1]); err != nil { + if _, err = io.ReadFull(conn.bufr, b[:1]); err != nil { return fmt.Errorf("chunk basic header 2: %w", err) } n += 1 csid = uint32(b[0]) + 64 case 1: // Chunk basic header 3 - if _, err = io.ReadFull(self.bufr, b[:2]); err != nil { + if _, err = io.ReadFull(conn.bufr, b[:2]); err != nil { return fmt.Errorf("chunk basic header 3: %w", err) } n += 2 csid = uint32(pio.U16BE(b)) + 64 } - cs := self.readcsmap[csid] + cs := conn.readcsmap[csid] if cs == nil { cs = &chunkStream{} - self.readcsmap[csid] = cs + conn.readcsmap[csid] = cs } var timestamp uint32 @@ -1275,7 +1277,7 @@ func (self *Conn) readChunk() (err error) { return } h := b[:11] - if _, err = io.ReadFull(self.bufr, h); err != nil { + if _, err = io.ReadFull(conn.bufr, h); err != nil { return } n += len(h) @@ -1285,7 +1287,7 @@ func (self *Conn) readChunk() (err error) { cs.msgtypeid = h[6] cs.msgsid = pio.U32LE(h[7:11]) if timestamp == 0xffffff { - if _, err = io.ReadFull(self.bufr, b[:4]); err != nil { + if _, err = io.ReadFull(conn.bufr, b[:4]); err != nil { return } n += 4 @@ -1312,7 +1314,7 @@ func (self *Conn) readChunk() (err error) { return } h := b[:7] - if _, err = io.ReadFull(self.bufr, h); err != nil { + if _, err = io.ReadFull(conn.bufr, h); err != nil { return } n += len(h) @@ -1321,7 +1323,7 @@ func (self *Conn) readChunk() (err error) { cs.msgdatalen = pio.U24BE(h[3:6]) cs.msgtypeid = h[6] if timestamp == 0xffffff { - if _, err = io.ReadFull(self.bufr, b[:4]); err != nil { + if _, err = io.ReadFull(conn.bufr, b[:4]); err != nil { return } n += 4 @@ -1347,14 +1349,14 @@ func (self *Conn) readChunk() (err error) { return } h := b[:3] - if _, err = io.ReadFull(self.bufr, h); err != nil { + if _, err = io.ReadFull(conn.bufr, h); err != nil { return } n += len(h) cs.msghdrtype = msghdrtype timestamp = pio.U24BE(h[0:3]) if timestamp == 0xffffff { - if _, err = io.ReadFull(self.bufr, b[:4]); err != nil { + if _, err = io.ReadFull(conn.bufr, b[:4]); err != nil { return } n += 4 @@ -1372,7 +1374,7 @@ func (self *Conn) readChunk() (err error) { switch cs.msghdrtype { case 0: if cs.hastimeext { - if _, err = io.ReadFull(self.bufr, b[:4]); err != nil { + if _, err = io.ReadFull(conn.bufr, b[:4]); err != nil { return } n += 4 @@ -1381,7 +1383,7 @@ func (self *Conn) readChunk() (err error) { } case 1, 2: if cs.hastimeext { - if _, err = io.ReadFull(self.bufr, b[:4]); err != nil { + if _, err = io.ReadFull(conn.bufr, b[:4]); err != nil { return } n += 4 @@ -1400,12 +1402,12 @@ func (self *Conn) readChunk() (err error) { } size := int(cs.msgdataleft) - if size > self.readMaxChunkSize { - size = self.readMaxChunkSize + if size > conn.readMaxChunkSize { + size = conn.readMaxChunkSize } off := cs.msgdatalen - cs.msgdataleft buf := cs.msgdata[off : int(off)+size] - if _, err = io.ReadFull(self.bufr, buf); err != nil { + if _, err = io.ReadFull(conn.bufr, buf); err != nil { return } n += len(buf) @@ -1422,25 +1424,25 @@ func (self *Conn) readChunk() (err error) { fmt.Print(hex.Dump(cs.msgdata)) } - if err = self.handleMsg(cs.timenow, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil { + if err = conn.handleMsg(cs.timenow, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil { return fmt.Errorf("handleMsg: %w", err) } } - self.ackn += uint32(n) + conn.ackn += uint32(n) - if self.readAckSize != 0 && self.ackn > self.readAckSize { - if err = self.writeAck(self.ackn); err != nil { + if conn.readAckSize != 0 && conn.ackn > conn.readAckSize { + if err = conn.writeAck(conn.ackn); err != nil { return fmt.Errorf("writeACK: %w", err) } - self.flushWrite() - self.ackn = 0 + conn.flushWrite() + conn.ackn = 0 } return } -func (self *Conn) handleCommandMsgAMF0(b []byte) (n int, err error) { +func (conn *Conn) handleCommandMsgAMF0(b []byte) (n int, err error) { var name, transid, obj interface{} var size int @@ -1458,38 +1460,38 @@ func (self *Conn) handleCommandMsgAMF0(b []byte) (n int, err error) { n += size var ok bool - if self.commandname, ok = name.(string); !ok { + if conn.commandname, ok = name.(string); !ok { err = fmt.Errorf("CommandMsgAMF0 command is not string") return } - self.commandtransid, _ = transid.(float64) - self.commandobj, _ = obj.(flvio.AMFMap) - self.commandparams = []interface{}{} + conn.commandtransid, _ = transid.(float64) + conn.commandobj, _ = obj.(flvio.AMFMap) + conn.commandparams = []interface{}{} for n < len(b) { if obj, size, err = flvio.ParseAMF0Val(b[n:]); err != nil { return } n += size - self.commandparams = append(self.commandparams, obj) + conn.commandparams = append(conn.commandparams, obj) } if n < len(b) { err = fmt.Errorf("CommandMsgAMF0 left bytes=%d", len(b)-n) return } - self.gotcommand = true + conn.gotcommand = true return } -func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, msgdata []byte) (err error) { - self.msgdata = msgdata - self.msgtypeid = msgtypeid - self.timestamp = timestamp +func (conn *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, msgdata []byte) (err error) { + conn.msgdata = msgdata + conn.msgtypeid = msgtypeid + conn.timestamp = timestamp switch msgtypeid { case msgtypeidCommandMsgAMF0: - if _, err = self.handleCommandMsgAMF0(msgdata); err != nil { + if _, err = conn.handleCommandMsgAMF0(msgdata); err != nil { return } @@ -1499,7 +1501,7 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms return } // skip first byte - if _, err = self.handleCommandMsgAMF0(msgdata[1:]); err != nil { + if _, err = conn.handleCommandMsgAMF0(msgdata[1:]); err != nil { return } @@ -1508,16 +1510,16 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms err = fmt.Errorf("short packet of UserControl") return } - self.eventtype = pio.U16BE(msgdata) + conn.eventtype = pio.U16BE(msgdata) - if self.eventtype == eventtypePingRequest { + if conn.eventtype == eventtypePingRequest { if len(msgdata) != 6 { err = fmt.Errorf("wrong length for UserControl.PingRequest") return } pingtimestamp := pio.U32BE(msgdata[2:]) - self.writePingResponse(pingtimestamp) - self.flushWrite() + conn.writePingResponse(pingtimestamp) + conn.flushWrite() } case msgtypeidDataMsgAMF0: @@ -1530,7 +1532,7 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms return } n += size - self.datamsgvals = append(self.datamsgvals, obj) + conn.datamsgvals = append(conn.datamsgvals, obj) } if n < len(b) { err = fmt.Errorf("DataMsgAMF0 left bytes=%d", len(b)-n) @@ -1541,17 +1543,17 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms metaindex := -1 - for i, x := range self.datamsgvals { - switch x.(type) { + for i, x := range conn.datamsgvals { + switch x := x.(type) { case string: - if x.(string) == "onMetaData" { + if x == "onMetaData" { metaindex = i + 1 } } } - if metaindex != -1 && metaindex < len(self.datamsgvals) { - self.metadata = self.datamsgvals[metaindex].(flvio.AMFMap) + if metaindex != -1 && metaindex < len(conn.datamsgvals) { + conn.metadata = conn.datamsgvals[metaindex].(flvio.AMFMap) //fmt.Printf("onMetadata: %+v\n", self.metadata) //fmt.Printf("videocodecid: %#08x (%f)\n", int64(self.metadata["videocodecid"].(float64)), self.metadata["videocodecid"].(float64)) } @@ -1571,7 +1573,7 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms return } tag.Data = msgdata[n:] - self.avtag = tag + conn.avtag = tag case msgtypeidAudioMsg: if len(msgdata) == 0 { @@ -1583,20 +1585,20 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms return } tag.Data = msgdata[n:] - self.avtag = tag + conn.avtag = tag case msgtypeidSetChunkSize: if len(msgdata) < 4 { err = fmt.Errorf("short packet of SetChunkSize") return } - self.readMaxChunkSize = int(pio.U32BE(msgdata)) + conn.readMaxChunkSize = int(pio.U32BE(msgdata)) return case msgtypeidWindowAckSize: - if len(self.msgdata) != 4 { + if len(conn.msgdata) != 4 { return fmt.Errorf("invalid packet of WindowAckSize") } - self.readAckSize = pio.U32BE(self.msgdata) + conn.readAckSize = pio.U32BE(conn.msgdata) return default: if Debug { @@ -1604,7 +1606,7 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms } } - self.gotmsg = true + conn.gotmsg = true return } @@ -1652,7 +1654,7 @@ func hsCalcDigestPos(p []byte, base int) (pos int) { func hsFindDigest(p []byte, key []byte, base int) int { gap := hsCalcDigestPos(p, base) digest := hsMakeDigest(key, p, gap) - if bytes.Compare(p[gap:gap+32], digest) != 0 { + if !bytes.Equal(p[gap:gap+32], digest) { return -1 } return gap @@ -1688,14 +1690,14 @@ func hsCreate2(p []byte, key []byte) { copy(p[gap:], digest) } -func (self *Conn) handshakeClient() (err error) { +func (conn *Conn) handshakeClient() (err error) { var random [(1 + 1536*2) * 2]byte C0C1C2 := random[:1536*2+1] C0 := C0C1C2[:1] //C1 := C0C1C2[1:1536+1] C0C1 := C0C1C2[:1536+1] - C2 := C0C1C2[1536+1:] + var C2 []byte S0S1S2 := random[1536*2+1:] //S0 := S0S1S2[:1] @@ -1707,15 +1709,15 @@ func (self *Conn) handshakeClient() (err error) { //hsCreate01(C0C1, hsClientFullKey) // > C0C1 - if _, err = self.bufw.Write(C0C1); err != nil { + if _, err = conn.bufw.Write(C0C1); err != nil { return } - if err = self.bufw.Flush(); err != nil { + if err = conn.bufw.Flush(); err != nil { return } // < S0S1S2 - if _, err = io.ReadFull(self.bufr, S0S1S2); err != nil { + if _, err = io.ReadFull(conn.bufr, S0S1S2); err != nil { return } @@ -1730,15 +1732,15 @@ func (self *Conn) handshakeClient() (err error) { } // > C2 - if _, err = self.bufw.Write(C2); err != nil { + if _, err = conn.bufw.Write(C2); err != nil { return } - self.stage++ + conn.stage++ return } -func (self *Conn) handshakeServer() (err error) { +func (conn *Conn) handshakeServer() (err error) { var random [(1 + 1536*2) * 2]byte C0C1C2 := random[:1536*2+1] @@ -1754,7 +1756,7 @@ func (self *Conn) handshakeServer() (err error) { S2 := S0S1S2[1536+1:] // < C0C1 - if _, err = io.ReadFull(self.bufr, C0C1); err != nil { + if _, err = io.ReadFull(conn.bufr, C0C1); err != nil { return } if C0[0] != 3 { @@ -1784,19 +1786,19 @@ func (self *Conn) handshakeServer() (err error) { } // > S0S1S2 - if _, err = self.bufw.Write(S0S1S2); err != nil { + if _, err = conn.bufw.Write(S0S1S2); err != nil { return } - if err = self.bufw.Flush(); err != nil { + if err = conn.bufw.Flush(); err != nil { return } // < C2 - if _, err = io.ReadFull(self.bufr, C2); err != nil { + if _, err = io.ReadFull(conn.bufr, C2); err != nil { return } - self.stage++ + conn.stage++ return } @@ -1805,8 +1807,8 @@ type closeConn struct { waitclose chan bool } -func (self closeConn) Close() error { - self.waitclose <- true +func (cc closeConn) Close() error { + cc.waitclose <- true return nil }