2020-04-17 16:00:36 +02:00

224 lines
3.9 KiB
Go

package main
import (
"fmt"
"net"
"strings"
"sync"
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/av/pktque"
"github.com/datarhei/joy4/av/pubsub"
"github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/format/rtmp"
"github.com/datarhei/joy4/format/flv/flvio"
)
func init() {
format.RegisterAll()
}
type channel struct {
que *pubsub.Queue
metadata flvio.AMFMap
hasAudio bool
hasVideo bool
}
type Config struct {
Addr string
App string
Token string
}
type Server interface {
ListenAndServe()
Channels() []string
}
type server struct {
app string
token string
server *rtmp.Server
channels map[string]*channel
lock sync.RWMutex
}
var _ Server = &server{}
func New(config Config) (Server, error) {
if len(config.App) == 0 {
config.App = "/"
}
s := &server{
app: config.App,
token: config.Token,
}
s.server = &rtmp.Server{
Addr: config.Addr,
}
s.channels = make(map[string]*channel)
s.server.HandlePlay = s.handlePlay
s.server.HandlePublish = s.handlePublish
rtmp.Debug = false
return s, nil
}
func (s *server) ListenAndServe() {
s.server.ListenAndServe()
}
func (s *server) Channels() []string {
channels := []string{}
s.lock.RLock()
defer s.lock.RUnlock()
for key := range s.channels {
channels = append(channels, key)
}
return channels
}
func (s *server) log(who, action, path, message string, client net.Addr) {
fmt.Printf("%-7s %10s %s (%s) %s\n", who, action, path, client, message)
}
func (s *server) handlePlay(conn *rtmp.Conn) {
client := conn.NetConn().RemoteAddr()
q := conn.URL.Query()
token := q.Get("token")
if len(s.token) != 0 && s.token != token {
s.log("PLAY", "FORBIDDEN", conn.URL.Path, "invalid token", client)
conn.Close()
return
}
s.lock.RLock()
ch := s.channels[conn.URL.Path]
s.lock.RUnlock()
if ch != nil {
conn.SetMetaData(ch.metadata)
s.log("PLAY", "START", conn.URL.Path, "", client)
cursor := ch.que.Oldest()
filters := pktque.Filters{}
if ch.hasVideo == true {
filters = append(filters, &pktque.WaitKeyFrame{})
}
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: true})
demuxer := &pktque.FilterDemuxer{
Filter: filters,
Demuxer: cursor,
}
avutil.CopyFile(conn, demuxer)
s.log("PLAY", "STOP", conn.URL.Path, "", client)
} else {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client)
}
return
}
func (s *server) handlePublish(conn *rtmp.Conn) {
client := conn.NetConn().RemoteAddr()
q := conn.URL.Query()
token := q.Get("token")
if len(s.token) != 0 && s.token != token {
s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid token", client)
conn.Close()
return
}
if !strings.HasPrefix(conn.URL.Path, s.app) {
s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", client)
conn.Close()
return
}
streams, _ := conn.Streams()
if len(streams) == 0 {
s.log("PUBLISH", "INVALID", conn.URL.Path, "no streams available", client)
conn.Close()
return
}
s.lock.Lock()
ch := s.channels[conn.URL.Path]
if ch == nil {
ch = &channel{}
ch.metadata = conn.GetMetaData()
ch.que = pubsub.NewQueue()
ch.que.WriteHeader(streams)
for _, stream := range streams {
typ := stream.Type()
switch {
case typ.IsAudio():
ch.hasAudio = true
case typ.IsVideo():
ch.hasVideo = true
}
}
s.channels[conn.URL.Path] = ch
} else {
ch = nil
}
s.lock.Unlock()
if ch == nil {
s.log("PUBLISH", "CONFLICT", conn.URL.Path, "already publishing", client)
conn.Close()
return
}
s.log("PUBLISH", "START", conn.URL.Path, "", client)
for _, stream := range streams {
s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client)
}
avutil.CopyPackets(ch.que, conn)
s.lock.Lock()
delete(s.channels, conn.URL.Path)
s.lock.Unlock()
ch.que.Close()
s.log("PUBLISH", "STOP", conn.URL.Path, "", client)
return
}
func main() {
server, _ := New(Config{
Addr: ":1935",
})
server.ListenAndServe()
}