From 092447b2c205b1fa660438384552071859106a82 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 27 May 2024 10:31:22 +0200 Subject: [PATCH] Add DebugChunks option, add HandleNetConn method --- examples/rtmp_server/main.go | 3 ++ format/rtmp/rtmp.go | 65 ++++++++++++++++++++++++++---------- 2 files changed, 50 insertions(+), 18 deletions(-) diff --git a/examples/rtmp_server/main.go b/examples/rtmp_server/main.go index 60ea422..6bceb39 100644 --- a/examples/rtmp_server/main.go +++ b/examples/rtmp_server/main.go @@ -63,6 +63,9 @@ func New(config Config) (Server, error) { s.server = &rtmp.Server{ Addr: config.Addr, MaxProbePacketCount: 40, + DebugChunks: func(_ net.Conn) bool { + return true + }, } s.channels = make(map[string]*channel) diff --git a/format/rtmp/rtmp.go b/format/rtmp/rtmp.go index 75d0b80..304ba52 100644 --- a/format/rtmp/rtmp.go +++ b/format/rtmp/rtmp.go @@ -67,11 +67,31 @@ type Server struct { MaxProbePacketCount int SkipInvalidMessages bool + DebugChunks func(conn net.Conn) bool listener net.Listener doneChan chan struct{} } +func (s *Server) HandleNetConn(netconn net.Conn) (err error) { + conn := NewConn(netconn) + conn.prober = flv.NewProber(s.MaxProbePacketCount) + conn.skipInvalidMessages = s.SkipInvalidMessages + if s.DebugChunks != nil { + conn.debugChunks = s.DebugChunks(netconn) + } + conn.isserver = true + + err = s.handleConn(conn) + if Debug { + fmt.Println("rtmp: server: client closed err:", err) + } + + conn.Close() + + return +} + func (s *Server) handleConn(conn *Conn) (err error) { if s.HandleConn != nil { s.HandleConn(conn) @@ -174,17 +194,12 @@ func (s *Server) Serve(listener net.Listener) error { fmt.Println("rtmp: server: accepted") } - conn := NewConn(netconn) - conn.prober = flv.NewProber(s.MaxProbePacketCount) - conn.skipInvalidMessages = s.SkipInvalidMessages - conn.isserver = true - go func() { - err := s.handleConn(conn) + go func(conn net.Conn) { + err := s.HandleNetConn(conn) if Debug { fmt.Println("rtmp: server: client closed err:", err) } - conn.Close() - }() + }(netconn) } } @@ -259,6 +274,8 @@ type Conn struct { start time.Time skipInvalidMessages bool + + debugChunks bool } type txrxcount struct { @@ -387,6 +404,15 @@ func (conn *Conn) pollMsg() (err error) { if err = conn.readChunk(); err != nil { return } + + if conn.readAckSize != 0 && conn.ackn > conn.readAckSize { + if err = conn.writeAck(conn.ackn, false); err != nil { + return fmt.Errorf("writeACK: %w", err) + } + conn.flushWrite() + conn.ackn = 0 + } + if conn.gotmsg { return } @@ -1518,8 +1544,19 @@ func (conn *Conn) readChunk() (err error) { cs.msgdataleft -= uint32(size) if Debug { - fmt.Printf("rtmp: chunk msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d\n", - cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.msgdatalen, cs.msgdataleft) + fmt.Printf("rtmp: chunk msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d max=%d", + cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) + } + + if conn.debugChunks { + data := fmt.Sprintf("rtmp: chunk id=%d msgsid=%d msgtypeid=%d msghdrtype=%d timestamp=%d len=%d left=%d max=%d", + csid, cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.timenow, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) + + if cs.msgtypeid != msgtypeidVideoMsg && cs.msgtypeid != msgtypeidAudioMsg { + data += " data=" + hex.EncodeToString(cs.msgdata) + } + + fmt.Printf("%s\n", data) } if cs.msgdataleft == 0 { @@ -1562,14 +1599,6 @@ func (conn *Conn) readChunk() (err error) { conn.ackn += uint32(n) - if conn.readAckSize != 0 && conn.ackn > conn.readAckSize { - if err = conn.writeAck(conn.ackn, false); err != nil { - return fmt.Errorf("writeACK: %w", err) - } - conn.flushWrite() - conn.ackn = 0 - } - return }