Add DebugChunks option, add HandleNetConn method

This commit is contained in:
Ingo Oppermann 2024-05-27 10:31:22 +02:00
parent b621603cf9
commit 092447b2c2
2 changed files with 50 additions and 18 deletions

View File

@ -63,6 +63,9 @@ func New(config Config) (Server, error) {
s.server = &rtmp.Server{ s.server = &rtmp.Server{
Addr: config.Addr, Addr: config.Addr,
MaxProbePacketCount: 40, MaxProbePacketCount: 40,
DebugChunks: func(_ net.Conn) bool {
return true
},
} }
s.channels = make(map[string]*channel) s.channels = make(map[string]*channel)

View File

@ -67,11 +67,31 @@ type Server struct {
MaxProbePacketCount int MaxProbePacketCount int
SkipInvalidMessages bool SkipInvalidMessages bool
DebugChunks func(conn net.Conn) bool
listener net.Listener listener net.Listener
doneChan chan struct{} doneChan chan struct{}
} }
func (s *Server) HandleNetConn(netconn net.Conn) (err error) {
conn := NewConn(netconn)
conn.prober = flv.NewProber(s.MaxProbePacketCount)
conn.skipInvalidMessages = s.SkipInvalidMessages
if s.DebugChunks != nil {
conn.debugChunks = s.DebugChunks(netconn)
}
conn.isserver = true
err = s.handleConn(conn)
if Debug {
fmt.Println("rtmp: server: client closed err:", err)
}
conn.Close()
return
}
func (s *Server) handleConn(conn *Conn) (err error) { func (s *Server) handleConn(conn *Conn) (err error) {
if s.HandleConn != nil { if s.HandleConn != nil {
s.HandleConn(conn) s.HandleConn(conn)
@ -174,17 +194,12 @@ func (s *Server) Serve(listener net.Listener) error {
fmt.Println("rtmp: server: accepted") fmt.Println("rtmp: server: accepted")
} }
conn := NewConn(netconn) go func(conn net.Conn) {
conn.prober = flv.NewProber(s.MaxProbePacketCount) err := s.HandleNetConn(conn)
conn.skipInvalidMessages = s.SkipInvalidMessages
conn.isserver = true
go func() {
err := s.handleConn(conn)
if Debug { if Debug {
fmt.Println("rtmp: server: client closed err:", err) fmt.Println("rtmp: server: client closed err:", err)
} }
conn.Close() }(netconn)
}()
} }
} }
@ -259,6 +274,8 @@ type Conn struct {
start time.Time start time.Time
skipInvalidMessages bool skipInvalidMessages bool
debugChunks bool
} }
type txrxcount struct { type txrxcount struct {
@ -387,6 +404,15 @@ func (conn *Conn) pollMsg() (err error) {
if err = conn.readChunk(); err != nil { if err = conn.readChunk(); err != nil {
return return
} }
if conn.readAckSize != 0 && conn.ackn > conn.readAckSize {
if err = conn.writeAck(conn.ackn, false); err != nil {
return fmt.Errorf("writeACK: %w", err)
}
conn.flushWrite()
conn.ackn = 0
}
if conn.gotmsg { if conn.gotmsg {
return return
} }
@ -1518,8 +1544,19 @@ func (conn *Conn) readChunk() (err error) {
cs.msgdataleft -= uint32(size) cs.msgdataleft -= uint32(size)
if Debug { if Debug {
fmt.Printf("rtmp: chunk msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d\n", fmt.Printf("rtmp: chunk msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d max=%d",
cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.msgdatalen, cs.msgdataleft) cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize)
}
if conn.debugChunks {
data := fmt.Sprintf("rtmp: chunk id=%d msgsid=%d msgtypeid=%d msghdrtype=%d timestamp=%d len=%d left=%d max=%d",
csid, cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.timenow, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize)
if cs.msgtypeid != msgtypeidVideoMsg && cs.msgtypeid != msgtypeidAudioMsg {
data += " data=" + hex.EncodeToString(cs.msgdata)
}
fmt.Printf("%s\n", data)
} }
if cs.msgdataleft == 0 { if cs.msgdataleft == 0 {
@ -1562,14 +1599,6 @@ func (conn *Conn) readChunk() (err error) {
conn.ackn += uint32(n) conn.ackn += uint32(n)
if conn.readAckSize != 0 && conn.ackn > conn.readAckSize {
if err = conn.writeAck(conn.ackn, false); err != nil {
return fmt.Errorf("writeACK: %w", err)
}
conn.flushWrite()
conn.ackn = 0
}
return return
} }