From 68d2f4ed427a8ca9769736abf5209dae6cd6a79c Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 12 Jan 2021 17:30:00 +0100 Subject: [PATCH] Allow to close a server --- format/rtmp/rtmp.go | 51 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/format/rtmp/rtmp.go b/format/rtmp/rtmp.go index 41b4620..e4745af 100644 --- a/format/rtmp/rtmp.go +++ b/format/rtmp/rtmp.go @@ -7,6 +7,7 @@ import ( "crypto/rand" "crypto/sha256" "encoding/hex" + "errors" "fmt" "github.com/datarhei/joy4/utils/bits/pio" "github.com/datarhei/joy4/av" @@ -53,11 +54,16 @@ func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) { return } +var ErrServerClosed = errors.New("rtmp: Server closed") + type Server struct { Addr string HandlePublish func(*Conn) HandlePlay func(*Conn) HandleConn func(*Conn) + + listener *net.TCPListener + doneChan chan struct{} } func (self *Server) handleConn(conn *Conn) (err error) { @@ -82,30 +88,41 @@ func (self *Server) handleConn(conn *Conn) (err error) { return } -func (self *Server) ListenAndServe() (err error) { +func (self *Server) ListenAndServe() error { addr := self.Addr if addr == "" { addr = ":1935" } - var tcpaddr *net.TCPAddr - if tcpaddr, err = net.ResolveTCPAddr("tcp", addr); err != nil { - err = fmt.Errorf("rtmp: ListenAndServe: %s", err) - return + + tcpaddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return fmt.Errorf("rtmp: %w", err) } - var listener *net.TCPListener - if listener, err = net.ListenTCP("tcp", tcpaddr); err != nil { - return + listener, err := net.ListenTCP("tcp", tcpaddr) + if err != nil { + return fmt.Errorf("rtmp: %w", err) } + self.doneChan = make(chan struct{}) + + self.listener = listener + defer func() { self.listener = nil }() + if Debug { fmt.Println("rtmp: server: listening on", addr) } for { - var netconn net.Conn - if netconn, err = listener.Accept(); err != nil { - return + netconn, err := self.listener.Accept() + if err != nil { + select { + case <-self.doneChan: + return ErrServerClosed + default: + } + + return err } if Debug { @@ -121,6 +138,18 @@ func (self *Server) ListenAndServe() (err error) { } }() } + + return nil +} + +func (self *Server) Close() { + if self.listener == nil { + return + } + + close(self.doneChan) + + self.listener.Close() } const (