diff --git a/av/avconv/avconv.go b/av/avconv/avconv.go index 6c954a1..a9b67ba 100644 --- a/av/avconv/avconv.go +++ b/av/avconv/avconv.go @@ -11,22 +11,6 @@ import ( var Debug bool -func Open(filename string) (demuxer av.DemuxCloser, err error) { - if demuxer, err = avutil.Open(filename); err != nil { - err = fmt.Errorf("avconv: open input `%s` failed: %s", filename, err) - return - } - return -} - -func Create(filename string) (muxer av.MuxCloser, err error) { - if muxer, err = avutil.Create(filename); err != nil { - err = fmt.Errorf("avconv: create output `%s` failed: %s", filename, err) - return - } - return -} - type Option struct { Transcode bool Args []string @@ -178,12 +162,12 @@ func ConvertCmdline(args []string) (err error) { var demuxer av.DemuxCloser var muxer av.MuxCloser - if demuxer, err = Open(input); err != nil { + if demuxer, err = avutil.Open(input); err != nil { return } defer demuxer.Close() - if muxer, err = Create(output); err != nil { + if muxer, err = avutil.Create(output); err != nil { return } defer muxer.Close() diff --git a/av/avutil/avutil.go b/av/avutil/avutil.go index 5087c27..48659ba 100644 --- a/av/avutil/avutil.go +++ b/av/avutil/avutil.go @@ -2,6 +2,7 @@ package avutil import ( "io" + "strings" "fmt" "bytes" "github.com/nareix/joy4/av" @@ -52,6 +53,8 @@ type RegisterHandler struct { Probe func([]byte)bool AudioEncoder func(av.CodecType)(av.AudioEncoder,error) AudioDecoder func(av.AudioCodecData)(av.AudioDecoder,error) + ServerDemuxer func(string)(bool,av.DemuxCloser,error) + ServerMuxer func(string)(bool,av.MuxCloser,error) } type Handlers struct { @@ -111,11 +114,26 @@ func (self *Handlers) NewAudioDecoder(codec av.AudioCodecData) (dec av.AudioDeco } func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) { + listen := false + if strings.HasPrefix(uri, "listen:") { + uri = uri[len("listen:"):] + listen = true + } + for _, handler := range self.handlers { - if handler.UrlDemuxer != nil { - var ok bool - if ok, demuxer, err = handler.UrlDemuxer(uri); ok { - return + if listen { + if handler.ServerDemuxer != nil { + var ok bool + if ok, demuxer, err = handler.ServerDemuxer(uri); ok { + return + } + } + } else { + if handler.UrlDemuxer != nil { + var ok bool + if ok, demuxer, err = handler.UrlDemuxer(uri); ok { + return + } } } } @@ -170,6 +188,23 @@ func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) { } func (self *Handlers) Create(uri string) (muxer av.MuxCloser, err error) { + listen := false + if strings.HasPrefix(uri, "listen:") { + uri = uri[len("listen:"):] + listen = true + } + + for _, handler := range self.handlers { + if listen { + if handler.ServerMuxer != nil { + var ok bool + if ok, muxer, err = handler.ServerMuxer(uri); ok { + return + } + } + } + } + var ext string var u *url.URL if u, _ = url.Parse(uri); u != nil && u.Scheme != "" { diff --git a/format/rtmp/rtmp.go b/format/rtmp/rtmp.go index b4928a0..3f66662 100644 --- a/format/rtmp/rtmp.go +++ b/format/rtmp/rtmp.go @@ -25,7 +25,13 @@ import ( var MaxProbePacketCount = 6 -func ParseURL(uri string) (u *url.URL) { +func ParseURL(uri string) (u *url.URL, err error) { + if u, err = url.Parse(uri); err != nil { + return + } + if _, _, serr := net.SplitHostPort(u.Host); serr != nil { + u.Host += ":1935" + } return } @@ -35,13 +41,9 @@ func Dial(uri string) (conn *Conn, err error) { func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) { var u *url.URL - - if u, err = url.Parse(uri); err != nil { + if u, err = ParseURL(uri); err != nil { return } - if _, _, serr := net.SplitHostPort(u.Host); serr != nil { - u.Host += ":1935" - } dailer := net.Dialer{Timeout: timeout} var netconn net.Conn @@ -301,7 +303,7 @@ func createURL(tcurl, app, play string) (u *url.URL) { out = append(out, s) } } - path := strings.Join(out, "/") + path := "/"+strings.Join(out, "/") u, _ = url.ParseRequestURI(path) if tcurl != "" { @@ -1740,6 +1742,16 @@ func (self *Conn) handshakeServer() (err error) { return } +type closeConn struct { + *Conn + waitclose chan bool +} + +func (self closeConn) Close() error { + self.waitclose <- true + return nil +} + func Handler(h *avutil.RegisterHandler) { h.UrlDemuxer = func(uri string) (ok bool, demuxer av.DemuxCloser, err error) { if !strings.HasPrefix(uri, "rtmp://") { @@ -1749,5 +1761,87 @@ func Handler(h *avutil.RegisterHandler) { demuxer, err = Dial(uri) return } + + h.ServerMuxer = func(uri string) (ok bool, muxer av.MuxCloser, err error) { + if !strings.HasPrefix(uri, "rtmp://") { + return + } + ok = true + + var u *url.URL + if u, err = ParseURL(uri); err != nil { + return + } + server := &Server{ + Addr: u.Host, + } + + waitstart := make(chan error) + waitconn := make(chan *Conn) + waitclose := make(chan bool) + + server.HandlePlay = func(conn *Conn) { + waitconn <- conn + <-waitclose + } + + go func() { + waitstart <- server.ListenAndServe() + }() + + select { + case err = <-waitstart: + if err != nil { + return + } + + case conn := <-waitconn: + muxer = closeConn{Conn: conn, waitclose: waitclose} + return + } + + return + } + + h.ServerDemuxer = func(uri string) (ok bool, demuxer av.DemuxCloser, err error) { + if !strings.HasPrefix(uri, "rtmp://") { + return + } + ok = true + + var u *url.URL + if u, err = ParseURL(uri); err != nil { + return + } + server := &Server{ + Addr: u.Host, + } + + waitstart := make(chan error) + waitconn := make(chan *Conn) + waitclose := make(chan bool) + + server.HandlePublish = func(conn *Conn) { + waitconn <- conn + <-waitclose + } + + go func() { + waitstart <- server.ListenAndServe() + }() + + select { + case err = <-waitstart: + if err != nil { + return + } + + case conn := <-waitconn: + demuxer = closeConn{Conn: conn, waitclose: waitclose} + return + } + + return + } }