diff --git a/examples/http_flv_and_rtmp_server/main.go b/examples/http_flv_and_rtmp_server/main.go index ac358d4..3a2786c 100644 --- a/examples/http_flv_and_rtmp_server/main.go +++ b/examples/http_flv_and_rtmp_server/main.go @@ -1,14 +1,14 @@ package main import ( - "sync" - "io" - "net/http" - "github.com/datarhei/joy4/format" "github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/pubsub" - "github.com/datarhei/joy4/format/rtmp" + "github.com/datarhei/joy4/format" "github.com/datarhei/joy4/format/flv" + "github.com/datarhei/joy4/format/rtmp" + "io" + "net/http" + "sync" ) func init() { diff --git a/examples/open_probe_file/main.go b/examples/open_probe_file/main.go index 9912017..ea9b240 100644 --- a/examples/open_probe_file/main.go +++ b/examples/open_probe_file/main.go @@ -36,4 +36,3 @@ func main() { file.Close() } - diff --git a/examples/rtmp_publish/main.go b/examples/rtmp_publish/main.go index 7d695e6..0ba1da5 100644 --- a/examples/rtmp_publish/main.go +++ b/examples/rtmp_publish/main.go @@ -1,9 +1,9 @@ package main import ( + "github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/pktque" "github.com/datarhei/joy4/format" - "github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/format/rtmp" ) @@ -24,4 +24,3 @@ func main() { file.Close() conn.Close() } - diff --git a/examples/rtmp_publish_sink/.gitignore b/examples/rtmp_publish_sink/.gitignore new file mode 100644 index 0000000..f2b2fd6 --- /dev/null +++ b/examples/rtmp_publish_sink/.gitignore @@ -0,0 +1 @@ +rtmp_publish_sink diff --git a/examples/rtmp_publish_sink/main.go b/examples/rtmp_publish_sink/main.go new file mode 100644 index 0000000..895ae2d --- /dev/null +++ b/examples/rtmp_publish_sink/main.go @@ -0,0 +1,135 @@ +package main + +import ( + "flag" + "fmt" + "io" + "net" + + "github.com/datarhei/joy4/format" + "github.com/datarhei/joy4/format/rtmp" +) + +func init() { + format.RegisterAll() +} + +type Config struct { + Addr string + App string + Token string +} + +type Server interface { + ListenAndServe() error + ListenAndServeTLS(certFile, keyFile string) error +} + +type server struct { + app string + token string + + server *rtmp.Server +} + +var _ Server = &server{} + +func New(config Config) (Server, error) { + if len(config.App) == 0 { + config.App = "/" + } + + s := &server{ + app: config.App, + token: config.Token, + } + + s.server = &rtmp.Server{ + Addr: config.Addr, + } + + s.server.HandlePlay = s.handlePlay + s.server.HandlePublish = s.handlePublish + + rtmp.Debug = false + + return s, nil +} + +func (s *server) ListenAndServe() error { + return s.server.ListenAndServe() +} + +func (s *server) ListenAndServeTLS(certFile, keyFile string) error { + return s.server.ListenAndServeTLS(certFile, keyFile) +} + +func (s *server) log(who, action, path, message string, client net.Addr) { + fmt.Printf("%-7s %10s %s (%s) %s\n", who, action, path, client, message) +} + +func (s *server) handlePlay(conn *rtmp.Conn) { + conn.Close() +} + +func (s *server) handlePublish(conn *rtmp.Conn) { + client := conn.NetConn().RemoteAddr() + + streams, _ := conn.Streams() + + if len(streams) == 0 { + s.log("PUBLISH", "INVALID", conn.URL.Path, "no streams available", client) + conn.Close() + return + } + + s.log("PUBLISH", "START", conn.URL.Path, "", client) + + for _, stream := range streams { + s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client) + } + + for { + if _, err := conn.ReadPacket(); err != nil { + if err != io.EOF { + s.log("PUBLISH", "ERROR", conn.URL.Path, err.Error(), client) + } + + break + } + } + + s.log("PUBLISH", "STOP", conn.URL.Path, "", client) + + return +} + +func main() { + var cert string + var key string + var help bool + + flag.StringVar(&cert, "cert", "", "Path to the certifacate file") + flag.StringVar(&key, "key", "", "Path to the key file") + flag.BoolVar(&help, "h", false, "Show options") + + flag.Parse() + + config := Config{ + Addr: ":1935", + } + + server, _ := New(config) + + var err error + + if len(cert) == 0 && len(key) == 0 { + fmt.Printf("Started RTMP server. Listening on %s\n", config.Addr) + err = server.ListenAndServe() + } else { + fmt.Printf("Started RTMPS server. Listening on %s\n", config.Addr) + err = server.ListenAndServeTLS(cert, key) + } + + fmt.Printf("%s\n", err) +} diff --git a/examples/rtmp_server/main.go b/examples/rtmp_server/main.go index 82278c0..a337446 100644 --- a/examples/rtmp_server/main.go +++ b/examples/rtmp_server/main.go @@ -1,8 +1,8 @@ package main import ( - "fmt" "flag" + "fmt" "net" "strings" "sync" @@ -11,8 +11,8 @@ import ( "github.com/datarhei/joy4/av/pktque" "github.com/datarhei/joy4/av/pubsub" "github.com/datarhei/joy4/format" - "github.com/datarhei/joy4/format/rtmp" "github.com/datarhei/joy4/format/flv/flvio" + "github.com/datarhei/joy4/format/rtmp" ) func init() { @@ -27,9 +27,9 @@ type channel struct { } type Config struct { - Addr string - App string - Token string + Addr string + App string + Token string } type Server interface { @@ -39,8 +39,8 @@ type Server interface { } type server struct { - app string - token string + app string + token string server *rtmp.Server @@ -56,8 +56,8 @@ func New(config Config) (Server, error) { } s := &server{ - app: config.App, - token: config.Token, + app: config.App, + token: config.Token, } s.server = &rtmp.Server{ @@ -209,7 +209,10 @@ func (s *server) handlePublish(conn *rtmp.Conn) { s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client) } - avutil.CopyPackets(ch.que, conn) + err := avutil.CopyPackets(ch.que, conn) + if err != nil { + s.log("PUBLISH", "ERROR", conn.URL.Path, err.Error(), client) + } s.lock.Lock() delete(s.channels, conn.URL.Path) diff --git a/examples/rtmp_server_proxy/main.go b/examples/rtmp_server_proxy/main.go index 2911d91..d054aee 100644 --- a/examples/rtmp_server_proxy/main.go +++ b/examples/rtmp_server_proxy/main.go @@ -2,10 +2,10 @@ package main import ( "fmt" - "strings" - "github.com/datarhei/joy4/format" "github.com/datarhei/joy4/av/avutil" + "github.com/datarhei/joy4/format" "github.com/datarhei/joy4/format/rtmp" + "strings" ) func init() {