521 lines
12 KiB
Go
521 lines
12 KiB
Go
|
|
package rtmp
|
|
|
|
import (
|
|
"bytes"
|
|
"net"
|
|
"fmt"
|
|
"reflect"
|
|
"io/ioutil"
|
|
"os"
|
|
"bufio"
|
|
"log"
|
|
"time"
|
|
"strings"
|
|
_ "runtime/debug"
|
|
)
|
|
|
|
var (
|
|
event = make(chan eventS, 0)
|
|
eventDone = make(chan int, 0)
|
|
)
|
|
|
|
type eventS struct {
|
|
id int
|
|
mr *MsgStream
|
|
m *Msg
|
|
}
|
|
|
|
type eventID int
|
|
|
|
func (e eventS) String() string {
|
|
switch e.id {
|
|
case E_NEW:
|
|
return "new"
|
|
case E_PUBLISH:
|
|
return "publish"
|
|
case E_PLAY:
|
|
return "play"
|
|
case E_DATA:
|
|
switch e.m.typeid {
|
|
case MSG_VIDEO:
|
|
return fmt.Sprintf("ts %d video %d bytes key %t", e.m.curts, e.m.data.Len(), e.m.key)
|
|
case MSG_AUDIO:
|
|
return fmt.Sprintf("ts %d audio %d bytes", e.m.curts, e.m.data.Len())
|
|
}
|
|
case E_CLOSE:
|
|
return "close"
|
|
}
|
|
return ""
|
|
}
|
|
|
|
/*
|
|
server:
|
|
connect
|
|
createStream
|
|
publish
|
|
client:
|
|
connect
|
|
createStream
|
|
getStreamLength
|
|
play
|
|
*/
|
|
|
|
const (
|
|
E_NEW = iota
|
|
E_PUBLISH
|
|
E_PLAY
|
|
E_DATA
|
|
E_CLOSE
|
|
)
|
|
|
|
func handleConnect(mr *MsgStream, trans float64, app string) {
|
|
|
|
l.Printf("stream %v: connect: %s", mr, app)
|
|
|
|
mr.app = app
|
|
|
|
mr.WriteMsg32(2, MSG_ACK_SIZE, 0, 5000000)
|
|
mr.WriteMsg32(2, MSG_BANDWIDTH, 0, 5000000)
|
|
mr.WriteMsg32(2, MSG_CHUNK_SIZE, 0, 128)
|
|
|
|
mr.WriteAMFCmd(3, 0, []AMFObj {
|
|
AMFObj { atype : AMF_STRING, str : "_result", },
|
|
AMFObj { atype : AMF_NUMBER, f64 : trans, },
|
|
AMFObj { atype : AMF_OBJECT,
|
|
obj : map[string] AMFObj {
|
|
"fmtVer" : AMFObj { atype : AMF_STRING, str : "FMS/3,0,1,123", },
|
|
"capabilities" : AMFObj { atype : AMF_NUMBER, f64 : 31, },
|
|
},
|
|
},
|
|
AMFObj { atype : AMF_OBJECT,
|
|
obj : map[string] AMFObj {
|
|
"level" : AMFObj { atype : AMF_STRING, str : "status", },
|
|
"code" : AMFObj { atype : AMF_STRING, str : "NetConnection.Connect.Success", },
|
|
"description" : AMFObj { atype : AMF_STRING, str : "Connection Success.", },
|
|
"objectEncoding" : AMFObj { atype : AMF_NUMBER, f64 : 0, },
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func handleMeta(mr *MsgStream, obj AMFObj) {
|
|
|
|
mr.meta = obj
|
|
mr.W = int(obj.obj["width"].f64)
|
|
mr.H = int(obj.obj["height"].f64)
|
|
|
|
l.Printf("stream %v: meta video %dx%d", mr, mr.W, mr.H)
|
|
}
|
|
|
|
func handleCreateStream(mr *MsgStream, trans float64) {
|
|
|
|
l.Printf("stream %v: createStream", mr)
|
|
|
|
mr.WriteAMFCmd(3, 0, []AMFObj {
|
|
AMFObj { atype : AMF_STRING, str : "_result", },
|
|
AMFObj { atype : AMF_NUMBER, f64 : trans, },
|
|
AMFObj { atype : AMF_NULL, },
|
|
AMFObj { atype : AMF_NUMBER, f64 : 1 },
|
|
})
|
|
}
|
|
|
|
func handleGetStreamLength(mr *MsgStream, trans float64) {
|
|
}
|
|
|
|
func handlePublish(mr *MsgStream) {
|
|
|
|
l.Printf("stream %v: publish", mr)
|
|
|
|
mr.WriteAMFCmd(3, 0, []AMFObj {
|
|
AMFObj { atype : AMF_STRING, str : "onStatus", },
|
|
AMFObj { atype : AMF_NUMBER, f64 : 0, },
|
|
AMFObj { atype : AMF_NULL, },
|
|
AMFObj { atype : AMF_OBJECT,
|
|
obj : map[string] AMFObj {
|
|
"level" : AMFObj { atype : AMF_STRING, str : "status", },
|
|
"code" : AMFObj { atype : AMF_STRING, str : "NetStream.Publish.Start", },
|
|
"description" : AMFObj { atype : AMF_STRING, str : "Start publising.", },
|
|
},
|
|
},
|
|
})
|
|
|
|
event <- eventS{id:E_PUBLISH, mr:mr}
|
|
<-eventDone
|
|
}
|
|
|
|
type testsrc struct {
|
|
r *bufio.Reader
|
|
dir string
|
|
w,h int
|
|
ts int
|
|
codec string
|
|
key bool
|
|
idx int
|
|
data []byte
|
|
}
|
|
|
|
func tsrcNew() (m *testsrc) {
|
|
m = &testsrc{}
|
|
m.dir = "/pixies/go/data/tmp"
|
|
fi, _ := os.Open(fmt.Sprintf("%s/index", m.dir))
|
|
m.r = bufio.NewReader(fi)
|
|
l, _ := m.r.ReadString('\n')
|
|
fmt.Sscanf(l, "%dx%d", &m.w, &m.h)
|
|
return
|
|
}
|
|
|
|
func (m *testsrc) fetch() (err error) {
|
|
l, err := m.r.ReadString('\n')
|
|
if err != nil {
|
|
return
|
|
}
|
|
a := strings.Split(l, ",")
|
|
fmt.Sscanf(a[0], "%d", &m.ts)
|
|
m.codec = a[1]
|
|
fmt.Sscanf(a[2], "%d", &m.idx)
|
|
switch m.codec {
|
|
case "h264":
|
|
fmt.Sscanf(a[3], "%t", &m.key)
|
|
m.data, err = ioutil.ReadFile(fmt.Sprintf("%s/h264/%d.264", m.dir, m.idx))
|
|
case "aac":
|
|
m.data, err = ioutil.ReadFile(fmt.Sprintf("%s/aac/%d.aac", m.dir, m.idx))
|
|
}
|
|
return
|
|
}
|
|
|
|
func handlePlay(mr *MsgStream, strid int) {
|
|
|
|
l.Printf("stream %v: play", mr)
|
|
|
|
var tsrc *testsrc
|
|
//tsrc = tsrcNew()
|
|
|
|
if tsrc == nil {
|
|
event <- eventS{id:E_PLAY, mr:mr}
|
|
<-eventDone
|
|
} else {
|
|
l.Printf("stream %v: test play data in %s", mr, tsrc.dir)
|
|
mr.W = tsrc.w
|
|
mr.H = tsrc.h
|
|
l.Printf("stream %v: test video %dx%d", mr, mr.W, mr.H)
|
|
}
|
|
|
|
begin := func () {
|
|
|
|
var b bytes.Buffer
|
|
WriteInt(&b, 0, 2)
|
|
WriteInt(&b, strid, 4)
|
|
mr.WriteMsg(0, 2, MSG_USER, 0, 0, b.Bytes()) // stream begin 1
|
|
|
|
mr.WriteAMFCmd(5, strid, []AMFObj {
|
|
AMFObj { atype : AMF_STRING, str : "onStatus", },
|
|
AMFObj { atype : AMF_NUMBER, f64 : 0, },
|
|
AMFObj { atype : AMF_NULL, },
|
|
AMFObj { atype : AMF_OBJECT,
|
|
obj : map[string] AMFObj {
|
|
"level" : AMFObj { atype : AMF_STRING, str : "status", },
|
|
"code" : AMFObj { atype : AMF_STRING, str : "NetStream.Play.Start", },
|
|
"description" : AMFObj { atype : AMF_STRING, str : "Start live.", },
|
|
},
|
|
},
|
|
})
|
|
|
|
l.Printf("stream %v: begin: video %dx%d", mr, mr.W, mr.H)
|
|
|
|
mr.WriteAMFMeta(5, strid, []AMFObj {
|
|
AMFObj { atype : AMF_STRING, str : "|RtmpSampleAccess", },
|
|
AMFObj { atype : AMF_BOOLEAN, i: 1, },
|
|
AMFObj { atype : AMF_BOOLEAN, i: 1, },
|
|
})
|
|
|
|
mr.meta.obj["Server"] = AMFObj { atype : AMF_STRING, str : "Golang Rtmp Server", }
|
|
mr.meta.atype = AMF_OBJECT
|
|
l.Printf("stream %v: %v", mr, mr.meta)
|
|
mr.WriteAMFMeta(5, strid, []AMFObj {
|
|
AMFObj { atype : AMF_STRING, str : "onMetaData", },
|
|
mr.meta,
|
|
/*
|
|
AMFObj { atype : AMF_OBJECT,
|
|
obj : map[string] AMFObj {
|
|
"Server" : AMFObj { atype : AMF_STRING, str : "Golang Rtmp Server", },
|
|
"width" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), },
|
|
"height" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), },
|
|
"displayWidth" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), },
|
|
"displayHeight" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), },
|
|
"duration" : AMFObj { atype : AMF_NUMBER, f64 : 0, },
|
|
"framerate" : AMFObj { atype : AMF_NUMBER, f64 : 25000, },
|
|
"videodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 731, },
|
|
"videocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 7, },
|
|
"audiodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 122, },
|
|
"audiocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 10, },
|
|
},
|
|
},
|
|
*/
|
|
})
|
|
}
|
|
|
|
end := func () {
|
|
|
|
l.Printf("stream %v: end", mr)
|
|
|
|
var b bytes.Buffer
|
|
WriteInt(&b, 1, 2)
|
|
WriteInt(&b, strid, 4)
|
|
mr.WriteMsg(0, 2, MSG_USER, 0, 0, b.Bytes()) // stream eof 1
|
|
|
|
mr.WriteAMFCmd(5, strid, []AMFObj {
|
|
AMFObj { atype : AMF_STRING, str : "onStatus", },
|
|
AMFObj { atype : AMF_NUMBER, f64 : 0, },
|
|
AMFObj { atype : AMF_NULL, },
|
|
AMFObj { atype : AMF_OBJECT,
|
|
obj : map[string] AMFObj {
|
|
"level" : AMFObj { atype : AMF_STRING, str : "status", },
|
|
"code" : AMFObj { atype : AMF_STRING, str : "NetStream.Play.Stop", },
|
|
"description" : AMFObj { atype : AMF_STRING, str : "Stop live.", },
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
if tsrc == nil {
|
|
|
|
for {
|
|
nr := 0
|
|
|
|
for {
|
|
m := <-mr.que
|
|
if m == nil {
|
|
break
|
|
}
|
|
//if nr == 0 && !m.key {
|
|
// continue
|
|
//}
|
|
if nr == 0 {
|
|
begin()
|
|
l.Printf("stream %v: extra size %d %d", mr, len(mr.extraA), len(mr.extraV))
|
|
mr.WriteAAC(strid, 0, mr.extraA[2:])
|
|
mr.WritePPS(strid, 0, mr.extraV[5:])
|
|
}
|
|
l.Printf("data %v: got %v curts %v", mr, m, m.curts)
|
|
switch m.typeid {
|
|
case MSG_AUDIO:
|
|
mr.WriteAudio(strid, m.curts, m.data.Bytes()[2:])
|
|
case MSG_VIDEO:
|
|
mr.WriteVideo(strid, m.curts, m.key, m.data.Bytes()[5:])
|
|
}
|
|
nr++
|
|
}
|
|
end()
|
|
}
|
|
|
|
} else {
|
|
|
|
begin()
|
|
|
|
lf, _ := os.Create("/tmp/rtmp.log")
|
|
ll := log.New(lf, "", 0)
|
|
|
|
starttm := time.Now()
|
|
k := 0
|
|
|
|
for {
|
|
err := tsrc.fetch()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
switch tsrc.codec {
|
|
case "h264":
|
|
if tsrc.idx == 0 {
|
|
mr.WritePPS(strid, 0, tsrc.data)
|
|
} else {
|
|
mr.WriteVideo(strid, tsrc.ts, tsrc.key, tsrc.data)
|
|
}
|
|
case "aac":
|
|
if tsrc.idx == 0 {
|
|
mr.WriteAAC(strid, 0, tsrc.data)
|
|
} else {
|
|
mr.WriteAudio(strid, tsrc.ts, tsrc.data)
|
|
}
|
|
}
|
|
dur := time.Since(starttm).Nanoseconds()
|
|
diff := tsrc.ts - 1000 - int(dur/1000000)
|
|
if diff > 0 {
|
|
time.Sleep(time.Duration(diff)*time.Millisecond)
|
|
}
|
|
l.Printf("data %v: ts %v dur %v diff %v", mr, tsrc.ts, int(dur/1000000), diff)
|
|
ll.Printf("#%d %d,%s,%d %d", k, tsrc.ts, tsrc.codec, tsrc.idx, len(tsrc.data))
|
|
k++
|
|
}
|
|
}
|
|
}
|
|
|
|
func serve(mr *MsgStream) {
|
|
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
event <- eventS{id:E_CLOSE, mr:mr}
|
|
<-eventDone
|
|
l.Printf("stream %v: closed %v", mr, err)
|
|
//if err != "EOF" {
|
|
// l.Printf("stream %v: %v", mr, string(debug.Stack()))
|
|
//}
|
|
}
|
|
}()
|
|
|
|
handShake(mr.r)
|
|
|
|
// f, _ := os.Create("/tmp/pub.log")
|
|
// mr.l = log.New(f, "", 0)
|
|
|
|
for {
|
|
m := mr.ReadMsg()
|
|
if m == nil {
|
|
continue
|
|
}
|
|
|
|
//l.Printf("stream %v: msg %v", mr, m)
|
|
|
|
if m.typeid == MSG_AUDIO || m.typeid == MSG_VIDEO {
|
|
// mr.l.Printf("%d,%d", m.typeid, m.data.Len())
|
|
event <- eventS{id:E_DATA, mr:mr, m:m}
|
|
<-eventDone
|
|
}
|
|
|
|
if m.typeid == MSG_AMF_CMD || m.typeid == MSG_AMF_META {
|
|
a := ReadAMF(m.data)
|
|
//l.Printf("server: amfobj %v\n", a)
|
|
switch a.str {
|
|
case "connect":
|
|
a2 := ReadAMF(m.data)
|
|
a3 := ReadAMF(m.data)
|
|
if _, ok := a3.obj["app"]; !ok || a3.obj["app"].str == "" {
|
|
panic("connect: app not found")
|
|
}
|
|
handleConnect(mr, a2.f64, a3.obj["app"].str)
|
|
case "@setDataFrame":
|
|
ReadAMF(m.data)
|
|
a3 := ReadAMF(m.data)
|
|
handleMeta(mr, a3)
|
|
l.Printf("stream %v: setdataframe", mr)
|
|
case "createStream":
|
|
a2 := ReadAMF(m.data)
|
|
handleCreateStream(mr, a2.f64)
|
|
case "publish":
|
|
handlePublish(mr)
|
|
case "play":
|
|
handlePlay(mr, m.strid)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func listenEvent() {
|
|
idmap := map[string]*MsgStream{}
|
|
pubmap := map[string]*MsgStream{}
|
|
|
|
for {
|
|
e := <-event
|
|
if e.id == E_DATA {
|
|
l.Printf("data %v: %v", e.mr, e)
|
|
} else {
|
|
l.Printf("event %v: %v", e.mr, e)
|
|
}
|
|
switch {
|
|
case e.id == E_NEW:
|
|
idmap[e.mr.id] = e.mr
|
|
case e.id == E_PUBLISH:
|
|
if _, ok := pubmap[e.mr.app]; ok {
|
|
l.Printf("event %v: duplicated publish with %v app %s", e.mr, pubmap[e.mr.app], e.mr.app)
|
|
e.mr.Close()
|
|
} else {
|
|
e.mr.role = PUBLISHER
|
|
pubmap[e.mr.app] = e.mr
|
|
}
|
|
case e.id == E_PLAY:
|
|
e.mr.role = PLAYER
|
|
e.mr.que = make(chan *Msg, 16)
|
|
for _, mr := range idmap {
|
|
if mr.role == PUBLISHER && mr.app == e.mr.app && mr.stat == WAIT_DATA {
|
|
e.mr.W = mr.W
|
|
e.mr.H = mr.H
|
|
e.mr.extraA = mr.extraA
|
|
e.mr.extraV = mr.extraV
|
|
e.mr.meta = mr.meta
|
|
}
|
|
}
|
|
case e.id == E_CLOSE:
|
|
if e.mr.role == PUBLISHER {
|
|
delete(pubmap, e.mr.app)
|
|
for _, mr := range idmap {
|
|
if mr.role == PLAYER && mr.app == e.mr.app {
|
|
ch := reflect.ValueOf(mr.que)
|
|
var m *Msg = nil
|
|
ch.TrySend(reflect.ValueOf(m))
|
|
}
|
|
}
|
|
}
|
|
delete(idmap, e.mr.id)
|
|
case e.id == E_DATA && e.mr.stat == WAIT_EXTRA:
|
|
if len(e.mr.extraA) == 0 && e.m.typeid == MSG_AUDIO {
|
|
l.Printf("event %v: got aac config", e.mr)
|
|
e.mr.extraA = e.m.data.Bytes()
|
|
}
|
|
if len(e.mr.extraV) == 0 && e.m.typeid == MSG_VIDEO {
|
|
l.Printf("event %v: got pps", e.mr)
|
|
e.mr.extraV = e.m.data.Bytes()
|
|
}
|
|
if len(e.mr.extraA) > 0 && len(e.mr.extraV) > 0 {
|
|
l.Printf("event %v: got all extra", e.mr)
|
|
e.mr.stat = WAIT_DATA
|
|
for _, mr := range idmap {
|
|
if mr.role == PLAYER && mr.app == e.mr.app {
|
|
mr.W = e.mr.W
|
|
mr.H = e.mr.H
|
|
mr.extraA = e.mr.extraA
|
|
mr.extraV = e.mr.extraV
|
|
mr.meta = e.mr.meta
|
|
}
|
|
}
|
|
}
|
|
case e.id == E_DATA && e.mr.stat == WAIT_DATA:
|
|
for _, mr := range idmap {
|
|
if mr.role == PLAYER && mr.app == e.mr.app {
|
|
ch := reflect.ValueOf(mr.que)
|
|
ok := ch.TrySend(reflect.ValueOf(e.m))
|
|
if !ok {
|
|
l.Printf("event %v: send failed", e.mr)
|
|
} else {
|
|
l.Printf("event %v: send ok", e.mr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
eventDone <- 1
|
|
}
|
|
}
|
|
|
|
func SimpleServer() {
|
|
l.Printf("server: simple server starts")
|
|
ln, err := net.Listen("tcp", ":1935")
|
|
if err != nil {
|
|
l.Printf("server: error: listen 1935 %s\n", err)
|
|
return
|
|
}
|
|
go listenEvent()
|
|
for {
|
|
c, err := ln.Accept()
|
|
if err != nil {
|
|
l.Printf("server: error: sock accept %s\n", err)
|
|
break
|
|
}
|
|
go func (c net.Conn) {
|
|
mr := NewMsgStream(c)
|
|
event <- eventS{id:E_NEW, mr:mr}
|
|
<-eventDone
|
|
serve(mr)
|
|
} (c)
|
|
}
|
|
}
|
|
|