go fmt, add rtmp_publish_sink example

This commit is contained in:
Ingo Oppermann 2022-07-28 18:32:17 +02:00
parent 01b5cd703f
commit a305ac55f9
7 changed files with 157 additions and 20 deletions

View File

@ -1,14 +1,14 @@
package main package main
import ( import (
"sync"
"io"
"net/http"
"github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/av/pubsub" "github.com/datarhei/joy4/av/pubsub"
"github.com/datarhei/joy4/format/rtmp" "github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/format/flv" "github.com/datarhei/joy4/format/flv"
"github.com/datarhei/joy4/format/rtmp"
"io"
"net/http"
"sync"
) )
func init() { func init() {

View File

@ -36,4 +36,3 @@ func main() {
file.Close() file.Close()
} }

View File

@ -1,9 +1,9 @@
package main package main
import ( import (
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/av/pktque" "github.com/datarhei/joy4/av/pktque"
"github.com/datarhei/joy4/format" "github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/format/rtmp" "github.com/datarhei/joy4/format/rtmp"
) )
@ -24,4 +24,3 @@ func main() {
file.Close() file.Close()
conn.Close() conn.Close()
} }

1
examples/rtmp_publish_sink/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
rtmp_publish_sink

View File

@ -0,0 +1,135 @@
package main
import (
"flag"
"fmt"
"io"
"net"
"github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/format/rtmp"
)
func init() {
format.RegisterAll()
}
type Config struct {
Addr string
App string
Token string
}
type Server interface {
ListenAndServe() error
ListenAndServeTLS(certFile, keyFile string) error
}
type server struct {
app string
token string
server *rtmp.Server
}
var _ Server = &server{}
func New(config Config) (Server, error) {
if len(config.App) == 0 {
config.App = "/"
}
s := &server{
app: config.App,
token: config.Token,
}
s.server = &rtmp.Server{
Addr: config.Addr,
}
s.server.HandlePlay = s.handlePlay
s.server.HandlePublish = s.handlePublish
rtmp.Debug = false
return s, nil
}
func (s *server) ListenAndServe() error {
return s.server.ListenAndServe()
}
func (s *server) ListenAndServeTLS(certFile, keyFile string) error {
return s.server.ListenAndServeTLS(certFile, keyFile)
}
func (s *server) log(who, action, path, message string, client net.Addr) {
fmt.Printf("%-7s %10s %s (%s) %s\n", who, action, path, client, message)
}
func (s *server) handlePlay(conn *rtmp.Conn) {
conn.Close()
}
func (s *server) handlePublish(conn *rtmp.Conn) {
client := conn.NetConn().RemoteAddr()
streams, _ := conn.Streams()
if len(streams) == 0 {
s.log("PUBLISH", "INVALID", conn.URL.Path, "no streams available", client)
conn.Close()
return
}
s.log("PUBLISH", "START", conn.URL.Path, "", client)
for _, stream := range streams {
s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client)
}
for {
if _, err := conn.ReadPacket(); err != nil {
if err != io.EOF {
s.log("PUBLISH", "ERROR", conn.URL.Path, err.Error(), client)
}
break
}
}
s.log("PUBLISH", "STOP", conn.URL.Path, "", client)
return
}
func main() {
var cert string
var key string
var help bool
flag.StringVar(&cert, "cert", "", "Path to the certifacate file")
flag.StringVar(&key, "key", "", "Path to the key file")
flag.BoolVar(&help, "h", false, "Show options")
flag.Parse()
config := Config{
Addr: ":1935",
}
server, _ := New(config)
var err error
if len(cert) == 0 && len(key) == 0 {
fmt.Printf("Started RTMP server. Listening on %s\n", config.Addr)
err = server.ListenAndServe()
} else {
fmt.Printf("Started RTMPS server. Listening on %s\n", config.Addr)
err = server.ListenAndServeTLS(cert, key)
}
fmt.Printf("%s\n", err)
}

View File

@ -1,8 +1,8 @@
package main package main
import ( import (
"fmt"
"flag" "flag"
"fmt"
"net" "net"
"strings" "strings"
"sync" "sync"
@ -11,8 +11,8 @@ import (
"github.com/datarhei/joy4/av/pktque" "github.com/datarhei/joy4/av/pktque"
"github.com/datarhei/joy4/av/pubsub" "github.com/datarhei/joy4/av/pubsub"
"github.com/datarhei/joy4/format" "github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/format/rtmp"
"github.com/datarhei/joy4/format/flv/flvio" "github.com/datarhei/joy4/format/flv/flvio"
"github.com/datarhei/joy4/format/rtmp"
) )
func init() { func init() {
@ -27,9 +27,9 @@ type channel struct {
} }
type Config struct { type Config struct {
Addr string Addr string
App string App string
Token string Token string
} }
type Server interface { type Server interface {
@ -39,8 +39,8 @@ type Server interface {
} }
type server struct { type server struct {
app string app string
token string token string
server *rtmp.Server server *rtmp.Server
@ -56,8 +56,8 @@ func New(config Config) (Server, error) {
} }
s := &server{ s := &server{
app: config.App, app: config.App,
token: config.Token, token: config.Token,
} }
s.server = &rtmp.Server{ s.server = &rtmp.Server{
@ -209,7 +209,10 @@ func (s *server) handlePublish(conn *rtmp.Conn) {
s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client) s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client)
} }
avutil.CopyPackets(ch.que, conn) err := avutil.CopyPackets(ch.que, conn)
if err != nil {
s.log("PUBLISH", "ERROR", conn.URL.Path, err.Error(), client)
}
s.lock.Lock() s.lock.Lock()
delete(s.channels, conn.URL.Path) delete(s.channels, conn.URL.Path)

View File

@ -2,10 +2,10 @@ package main
import ( import (
"fmt" "fmt"
"strings"
"github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/format/rtmp" "github.com/datarhei/joy4/format/rtmp"
"strings"
) )
func init() { func init() {