add ServerDemuxer / ServerMuxer

This commit is contained in:
nareix 2016-07-12 16:44:48 +08:00
parent 79f3f92350
commit 6f32e25e03
3 changed files with 142 additions and 29 deletions

View File

@ -11,22 +11,6 @@ import (
var Debug bool
func Open(filename string) (demuxer av.DemuxCloser, err error) {
if demuxer, err = avutil.Open(filename); err != nil {
err = fmt.Errorf("avconv: open input `%s` failed: %s", filename, err)
return
}
return
}
func Create(filename string) (muxer av.MuxCloser, err error) {
if muxer, err = avutil.Create(filename); err != nil {
err = fmt.Errorf("avconv: create output `%s` failed: %s", filename, err)
return
}
return
}
type Option struct {
Transcode bool
Args []string
@ -178,12 +162,12 @@ func ConvertCmdline(args []string) (err error) {
var demuxer av.DemuxCloser
var muxer av.MuxCloser
if demuxer, err = Open(input); err != nil {
if demuxer, err = avutil.Open(input); err != nil {
return
}
defer demuxer.Close()
if muxer, err = Create(output); err != nil {
if muxer, err = avutil.Create(output); err != nil {
return
}
defer muxer.Close()

View File

@ -2,6 +2,7 @@ package avutil
import (
"io"
"strings"
"fmt"
"bytes"
"github.com/nareix/joy4/av"
@ -52,6 +53,8 @@ type RegisterHandler struct {
Probe func([]byte)bool
AudioEncoder func(av.CodecType)(av.AudioEncoder,error)
AudioDecoder func(av.AudioCodecData)(av.AudioDecoder,error)
ServerDemuxer func(string)(bool,av.DemuxCloser,error)
ServerMuxer func(string)(bool,av.MuxCloser,error)
}
type Handlers struct {
@ -111,7 +114,21 @@ func (self *Handlers) NewAudioDecoder(codec av.AudioCodecData) (dec av.AudioDeco
}
func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) {
listen := false
if strings.HasPrefix(uri, "listen:") {
uri = uri[len("listen:"):]
listen = true
}
for _, handler := range self.handlers {
if listen {
if handler.ServerDemuxer != nil {
var ok bool
if ok, demuxer, err = handler.ServerDemuxer(uri); ok {
return
}
}
} else {
if handler.UrlDemuxer != nil {
var ok bool
if ok, demuxer, err = handler.UrlDemuxer(uri); ok {
@ -119,6 +136,7 @@ func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) {
}
}
}
}
var r io.ReadCloser
var ext string
@ -170,6 +188,23 @@ func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) {
}
func (self *Handlers) Create(uri string) (muxer av.MuxCloser, err error) {
listen := false
if strings.HasPrefix(uri, "listen:") {
uri = uri[len("listen:"):]
listen = true
}
for _, handler := range self.handlers {
if listen {
if handler.ServerMuxer != nil {
var ok bool
if ok, muxer, err = handler.ServerMuxer(uri); ok {
return
}
}
}
}
var ext string
var u *url.URL
if u, _ = url.Parse(uri); u != nil && u.Scheme != "" {

View File

@ -25,7 +25,13 @@ import (
var MaxProbePacketCount = 6
func ParseURL(uri string) (u *url.URL) {
func ParseURL(uri string) (u *url.URL, err error) {
if u, err = url.Parse(uri); err != nil {
return
}
if _, _, serr := net.SplitHostPort(u.Host); serr != nil {
u.Host += ":1935"
}
return
}
@ -35,13 +41,9 @@ func Dial(uri string) (conn *Conn, err error) {
func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) {
var u *url.URL
if u, err = url.Parse(uri); err != nil {
if u, err = ParseURL(uri); err != nil {
return
}
if _, _, serr := net.SplitHostPort(u.Host); serr != nil {
u.Host += ":1935"
}
dailer := net.Dialer{Timeout: timeout}
var netconn net.Conn
@ -301,7 +303,7 @@ func createURL(tcurl, app, play string) (u *url.URL) {
out = append(out, s)
}
}
path := strings.Join(out, "/")
path := "/"+strings.Join(out, "/")
u, _ = url.ParseRequestURI(path)
if tcurl != "" {
@ -1740,6 +1742,16 @@ func (self *Conn) handshakeServer() (err error) {
return
}
type closeConn struct {
*Conn
waitclose chan bool
}
func (self closeConn) Close() error {
self.waitclose <- true
return nil
}
func Handler(h *avutil.RegisterHandler) {
h.UrlDemuxer = func(uri string) (ok bool, demuxer av.DemuxCloser, err error) {
if !strings.HasPrefix(uri, "rtmp://") {
@ -1749,5 +1761,87 @@ func Handler(h *avutil.RegisterHandler) {
demuxer, err = Dial(uri)
return
}
h.ServerMuxer = func(uri string) (ok bool, muxer av.MuxCloser, err error) {
if !strings.HasPrefix(uri, "rtmp://") {
return
}
ok = true
var u *url.URL
if u, err = ParseURL(uri); err != nil {
return
}
server := &Server{
Addr: u.Host,
}
waitstart := make(chan error)
waitconn := make(chan *Conn)
waitclose := make(chan bool)
server.HandlePlay = func(conn *Conn) {
waitconn <- conn
<-waitclose
}
go func() {
waitstart <- server.ListenAndServe()
}()
select {
case err = <-waitstart:
if err != nil {
return
}
case conn := <-waitconn:
muxer = closeConn{Conn: conn, waitclose: waitclose}
return
}
return
}
h.ServerDemuxer = func(uri string) (ok bool, demuxer av.DemuxCloser, err error) {
if !strings.HasPrefix(uri, "rtmp://") {
return
}
ok = true
var u *url.URL
if u, err = ParseURL(uri); err != nil {
return
}
server := &Server{
Addr: u.Host,
}
waitstart := make(chan error)
waitconn := make(chan *Conn)
waitclose := make(chan bool)
server.HandlePublish = func(conn *Conn) {
waitconn <- conn
<-waitclose
}
go func() {
waitstart <- server.ListenAndServe()
}()
select {
case err = <-waitstart:
if err != nil {
return
}
case conn := <-waitconn:
demuxer = closeConn{Conn: conn, waitclose: waitclose}
return
}
return
}
}