From c44be377af51b9db0842c2ed91c33e198957fc7e Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 17 Apr 2020 16:00:36 +0200 Subject: [PATCH] Add example server --- examples/rtmp_server/main.go | 223 +++++++++++++++++++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 examples/rtmp_server/main.go diff --git a/examples/rtmp_server/main.go b/examples/rtmp_server/main.go new file mode 100644 index 0000000..8d089ef --- /dev/null +++ b/examples/rtmp_server/main.go @@ -0,0 +1,223 @@ +package main + +import ( + "fmt" + "net" + "strings" + "sync" + + "github.com/datarhei/joy4/av/avutil" + "github.com/datarhei/joy4/av/pktque" + "github.com/datarhei/joy4/av/pubsub" + "github.com/datarhei/joy4/format" + "github.com/datarhei/joy4/format/rtmp" + "github.com/datarhei/joy4/format/flv/flvio" +) + +func init() { + format.RegisterAll() +} + +type channel struct { + que *pubsub.Queue + metadata flvio.AMFMap + hasAudio bool + hasVideo bool +} + +type Config struct { + Addr string + App string + Token string +} + +type Server interface { + ListenAndServe() + Channels() []string +} + +type server struct { + app string + token string + + server *rtmp.Server + + channels map[string]*channel + lock sync.RWMutex +} + +var _ Server = &server{} + +func New(config Config) (Server, error) { + if len(config.App) == 0 { + config.App = "/" + } + + s := &server{ + app: config.App, + token: config.Token, + } + + s.server = &rtmp.Server{ + Addr: config.Addr, + } + + s.channels = make(map[string]*channel) + + s.server.HandlePlay = s.handlePlay + s.server.HandlePublish = s.handlePublish + + rtmp.Debug = false + + return s, nil +} + +func (s *server) ListenAndServe() { + s.server.ListenAndServe() +} + +func (s *server) Channels() []string { + channels := []string{} + + s.lock.RLock() + defer s.lock.RUnlock() + + for key := range s.channels { + channels = append(channels, key) + } + + return channels +} + +func (s *server) log(who, action, path, message string, client net.Addr) { + fmt.Printf("%-7s %10s %s (%s) %s\n", who, action, path, client, message) +} + +func (s *server) handlePlay(conn *rtmp.Conn) { + client := conn.NetConn().RemoteAddr() + + q := conn.URL.Query() + token := q.Get("token") + + if len(s.token) != 0 && s.token != token { + s.log("PLAY", "FORBIDDEN", conn.URL.Path, "invalid token", client) + conn.Close() + return + } + + s.lock.RLock() + ch := s.channels[conn.URL.Path] + s.lock.RUnlock() + + if ch != nil { + conn.SetMetaData(ch.metadata) + + s.log("PLAY", "START", conn.URL.Path, "", client) + cursor := ch.que.Oldest() + + filters := pktque.Filters{} + + if ch.hasVideo == true { + filters = append(filters, &pktque.WaitKeyFrame{}) + } + + filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: true}) + + demuxer := &pktque.FilterDemuxer{ + Filter: filters, + Demuxer: cursor, + } + + avutil.CopyFile(conn, demuxer) + s.log("PLAY", "STOP", conn.URL.Path, "", client) + } else { + s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) + } + + return +} + +func (s *server) handlePublish(conn *rtmp.Conn) { + client := conn.NetConn().RemoteAddr() + + q := conn.URL.Query() + token := q.Get("token") + + if len(s.token) != 0 && s.token != token { + s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid token", client) + conn.Close() + return + } + + if !strings.HasPrefix(conn.URL.Path, s.app) { + s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", client) + conn.Close() + return + } + + streams, _ := conn.Streams() + + if len(streams) == 0 { + s.log("PUBLISH", "INVALID", conn.URL.Path, "no streams available", client) + conn.Close() + return + } + + s.lock.Lock() + + ch := s.channels[conn.URL.Path] + if ch == nil { + ch = &channel{} + ch.metadata = conn.GetMetaData() + ch.que = pubsub.NewQueue() + ch.que.WriteHeader(streams) + for _, stream := range streams { + typ := stream.Type() + + switch { + case typ.IsAudio(): + ch.hasAudio = true + case typ.IsVideo(): + ch.hasVideo = true + } + } + + s.channels[conn.URL.Path] = ch + } else { + ch = nil + } + + s.lock.Unlock() + + if ch == nil { + s.log("PUBLISH", "CONFLICT", conn.URL.Path, "already publishing", client) + conn.Close() + return + } + + s.log("PUBLISH", "START", conn.URL.Path, "", client) + + for _, stream := range streams { + s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client) + } + + avutil.CopyPackets(ch.que, conn) + + s.lock.Lock() + delete(s.channels, conn.URL.Path) + s.lock.Unlock() + + ch.que.Close() + + s.log("PUBLISH", "STOP", conn.URL.Path, "", client) + + return +} + +func main() { + server, _ := New(Config{ + Addr: ":1935", + }) + + server.ListenAndServe() +}