This commit is contained in:
cfanfrank 2013-03-17 23:41:45 +08:00
commit fc3c9d7af3
8 changed files with 1150 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.*.swp

22
README.md Normal file
View File

@ -0,0 +1,22 @@
Rtmp
Golang rtmp server
Run a simple server
package main
import "github.com/go-av/rtmp"
func main() {
rtmp.SimpleServer()
}
Use avconv to publish stream
avconv -re -i a.mp4 -c:a copy -c:v copy -f flv rtmp://localhost/myapp/1
Use avplay to play stream
avplay rtmp://localhost/myapp/1

100
amf.go Normal file
View File

@ -0,0 +1,100 @@
package rtmp
import (
"io"
"encoding/binary"
)
var (
AMF_NUMBER = 0x00
AMF_BOOLEAN = 0x01
AMF_STRING = 0x02
AMF_OBJECT = 0x03
AMF_NULL = 0x05
AMF_ARRAY_NULL = 0x06
AMF_MIXED_ARRAY = 0x08
AMF_END = 0x09
AMF_ARRAY = 0x0a
AMF_INT8 = 0x0100
AMF_INT16 = 0x0101
AMF_INT32 = 0x0102
AMF_VARIANT_ = 0x0103
)
type AMFObj struct {
atype int
str string
i int
buf []byte
obj map[string]AMFObj
f64 float64
}
func ReadAMF(r io.Reader) (a AMFObj) {
a.atype = ReadInt(r, 1)
switch (a.atype) {
case AMF_STRING:
n := ReadInt(r, 2)
b := ReadBuf(r, n)
a.str = string(b)
case AMF_NUMBER:
binary.Read(r, binary.BigEndian, &a.f64)
case AMF_BOOLEAN:
a.i = ReadInt(r, 1)
case AMF_MIXED_ARRAY:
ReadInt(r, 4)
fallthrough
case AMF_OBJECT:
a.obj = map[string]AMFObj{}
for {
n := ReadInt(r, 2)
if n == 0 {
break
}
name := string(ReadBuf(r, n))
a.obj[name] = ReadAMF(r)
}
case AMF_ARRAY, AMF_VARIANT_:
panic("amf: read: unsupported array or variant")
case AMF_INT8:
a.i = ReadInt(r, 1)
case AMF_INT16:
a.i = ReadInt(r, 2)
case AMF_INT32:
a.i = ReadInt(r, 4)
}
return
}
func WriteAMF(r io.Writer, a AMFObj) {
WriteInt(r, a.atype, 1)
switch (a.atype) {
case AMF_STRING:
WriteInt(r, len(a.str), 2)
r.Write([]byte(a.str))
case AMF_NUMBER:
binary.Write(r, binary.BigEndian, a.f64)
case AMF_BOOLEAN:
WriteInt(r, a.i, 1)
case AMF_MIXED_ARRAY:
r.Write(a.buf[:4])
case AMF_OBJECT:
for name, val := range a.obj {
WriteInt(r, len(name), 2)
r.Write([]byte(name))
WriteAMF(r, val)
}
WriteInt(r, 9, 3)
case AMF_ARRAY, AMF_VARIANT_:
panic("amf: write unsupported array, var")
case AMF_INT8:
WriteInt(r, a.i, 1)
case AMF_INT16:
WriteInt(r, a.i, 2)
case AMF_INT32:
WriteInt(r, a.i, 4)
}
}

21
amf_test.go Normal file
View File

@ -0,0 +1,21 @@
package rtmp
import (
"testing"
"encoding/base64"
"bytes"
"fmt"
)
var (
data = `AgAHY29ubmVjdAA/8AAAAAAAAAMAA2FwcAIABW15YXBwAAhmbGFzaFZlcgIAEE1BQyAxMSw1LDUwMiwxNDkABnN3ZlVybAIAJmh0dHA6Ly9sb2NhbGhvc3Q6ODA4MS9zd2YvandwbGF5ZXIuc3dmAAV0Y1VybAIAFnJ0bXA6Ly9sb2NhbGhvc3QvbXlhcHAABGZwYWQBAAAMY2FwYWJpbGl0aWVzAEBt4AAAAAAAAAthdWRpb0NvZGVjcwBAq+4AAAAAAAALdmlkZW9Db2RlY3MAQG+AAAAAAAAADXZpZGVvRnVuY3Rpb24AP/AAAAAAAAAAB3BhZ2VVcmwCABpodHRwOi8vbG9jYWxob3N0OjgwODEvc3dmLwAOb2JqZWN0RW5jb2RpbmcAAAAAAAAAAAAAAAk=`
)
func TestHal(t *testing.T) {
dec := base64.NewDecoder(base64.StdEncoding, bytes.NewBufferString(data))
r := NewAMFReader(dec)
obj := r.ReadAMF()
fmt.Printf("%v\n", obj)
}

145
handshake.go Normal file
View File

@ -0,0 +1,145 @@
package rtmp
import (
"io"
"crypto/hmac"
"crypto/sha256"
"bytes"
"math/rand"
)
var (
clientKey = []byte{
'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
'F', 'l', 'a', 's', 'h', ' ', 'P', 'l', 'a', 'y', 'e', 'r', ' ',
'0', '0', '1',
0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1,
0x02, 0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE,
}
serverKey = []byte{
'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
'F', 'l', 'a', 's', 'h', ' ', 'M', 'e', 'd', 'i', 'a', ' ',
'S', 'e', 'r', 'v', 'e', 'r', ' ',
'0', '0', '1',
0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1,
0x02, 0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE,
}
clientKey2 = clientKey[:30]
serverKey2 = serverKey[:36]
serverVersion = []byte{
0x0D, 0x0E, 0x0A, 0x0D,
}
)
func makeDigest(key []byte, src []byte, skip int) (dst []byte) {
h := hmac.New(sha256.New, key)
if skip >= 0 && skip < len(src) {
if skip != 0 {
h.Write(src[:skip])
}
if len(src) != skip + 32 {
h.Write(src[skip+32:])
}
} else {
h.Write(src)
}
return h.Sum(nil)
}
func findDigest(b []byte, key []byte, base int) (int) {
offs := 0
for n := 0; n < 4; n++ {
offs += int(b[base + n])
}
offs = (offs % 728) + base + 4
// fmt.Printf("offs %v\n", offs)
dig := makeDigest(key, b, offs)
// fmt.Printf("digest %v\n", digest)
// fmt.Printf("p %v\n", b[offs:offs+32])
if bytes.Compare(b[offs:offs+32], dig) != 0 {
offs = -1
}
return offs
}
func writeDigest(b []byte, key []byte, base int) {
offs := 0
for n := 8; n < 12; n++ {
offs += int(b[base + n])
}
offs = (offs % 728) + base + 12
dig := makeDigest(key, b, offs)
copy(b[offs:], dig)
}
func createChal(b []byte, ver []byte, key []byte) {
b[0] = 3
copy(b[5:9], ver)
for i := 9; i < 1537; i++ {
b[i] = byte(rand.Int() % 256)
}
writeDigest(b[1:], key, 0)
}
func createResp(b []byte, key []byte) {
for i := 0; i < 1536; i++ {
b[i] = byte(rand.Int() % 256)
}
dig := makeDigest(key, b, 1536-32)
copy(b[1536-32:], dig)
}
func parseChal(b []byte, peerKey []byte, key []byte) (dig []byte, err int) {
if b[0] != 0x3 {
l.Printf("handshake: invalid rtmp version\n")
err = 1
return
}
epoch := b[1:5]
ver := b[5:9]
l.Printf("handshake: epoch %v ver %v\n", epoch, ver)
var offs int
if offs = findDigest(b[1:], peerKey, 772); offs == -1 {
if offs = findDigest(b[1:], peerKey, 8); offs == -1 {
l.Printf("handshake: digest not found\n")
err = 1
return
}
}
l.Printf("handshake: offs = %v\n", offs)
dig = makeDigest(key, b[1+offs:1+offs+32], -1)
return
}
func handShake(rw io.ReadWriter) {
b := ReadBuf(rw, 1537)
l.Printf("handshake: got client chal\n")
dig, err := parseChal(b, clientKey2, serverKey)
if err != 0 {
return
}
createChal(b, serverVersion, serverKey2)
l.Printf("handshake: send server chal\n")
rw.Write(b)
b = make([]byte, 1536)
createResp(b, dig)
l.Printf("handshake: send server resp\n")
rw.Write(b)
b = ReadBuf(rw, 1536)
l.Printf("handshake: got client resp\n")
}

299
msg.go Normal file
View File

@ -0,0 +1,299 @@
package rtmp
import (
"io"
"bytes"
"fmt"
"log"
)
var (
MSG_CHUNK_SIZE = 1
MSG_ABORT = 2
MSG_ACK = 3
MSG_USER = 4
MSG_ACK_SIZE = 5
MSG_BANDWIDTH = 6
MSG_EDGE = 7
MSG_AUDIO = 8
MSG_VIDEO = 9
MSG_AMF3_META = 15
MSG_AMF3_SHARED = 16
MSG_AMF3_CMD = 17
MSG_AMF_META = 18
MSG_AMF_SHARED = 19
MSG_AMF_CMD = 20
MSG_AGGREGATE = 22
MSG_MAX = 22
)
var (
MsgTypeStr = []string {
"?",
"CHUNK_SIZE", "ABORT", "ACK",
"USER", "ACK_SIZE", "BANDWIDTH", "EDGE",
"AUDIO", "VIDEO",
"AMF3_META", "AMF3_SHARED", "AFM3_CMD",
"AMF_META", "AMF_SHARED", "AMF_CMD",
"AGGREGATE",
}
)
type chunkHeader struct {
typeid int
mlen int
csid int
cfmt int
ts int
tsdelta int
strid int
}
func readChunkHeader (r io.Reader) (m chunkHeader) {
i := ReadInt(r, 1)
m.cfmt = (i>>6)&3;
m.csid = i&0x3f;
if m.csid == 0 {
j := ReadInt(r, 1)
m.csid = j + 64
}
if m.csid == 0x3f {
j := ReadInt(r, 2)
m.csid = j + 64
}
if m.cfmt == 0 {
m.ts = ReadInt(r, 3)
m.mlen = ReadInt(r, 3)
m.typeid = ReadInt(r, 1)
m.strid = ReadIntLE(r, 4)
}
if m.cfmt == 1 {
m.tsdelta = ReadInt(r, 3)
m.mlen = ReadInt(r, 3)
m.typeid = ReadInt(r, 1)
}
if m.cfmt == 2 {
m.tsdelta = ReadInt(r, 3)
}
if m.ts == 0xffffff {
m.ts = ReadInt(r, 4)
}
if m.tsdelta == 0xffffff {
m.tsdelta = ReadInt(r, 4)
}
//l.Printf("chunk: %v", m)
return
}
const (
UNKNOWN = 0
PLAYER = 1
PUBLISHER = 2
)
const (
WAIT_EXTRA = 0
WAIT_DATA = 1
)
type MsgStream struct {
r stream
Msg map[int]*Msg
vts, ats int
id string
role int
stat int
app string
W,H int
strid int
extraA, extraV []byte
que chan *Msg
l *log.Logger
}
type Msg struct {
chunkHeader
data *bytes.Buffer
key bool
curts int
}
func (m *Msg) String() string {
var typestr string
if m.typeid < len(MsgTypeStr) {
typestr = MsgTypeStr[m.typeid]
} else {
typestr = "?"
}
return fmt.Sprintf("%s %d %v", typestr, m.mlen, m.chunkHeader)
}
var (
mrseq = 0
)
func NewMsgStream(r io.ReadWriteCloser) *MsgStream {
mrseq++
return &MsgStream{
r:stream{r},
Msg:map[int]*Msg{},
id:fmt.Sprintf("#%d", mrseq),
}
}
func (mr *MsgStream) String() string {
return mr.id
}
func (mr *MsgStream) Close() {
mr.r.Close()
}
func (r *MsgStream) WriteMsg(cfmt, csid, typeid, strid, ts int, data []byte) {
var b bytes.Buffer
start := 0
for i := 0; start < len(data); i++ {
if i == 0 {
if cfmt == 0 {
WriteInt(&b, csid, 1) // fmt=0 csid
WriteInt(&b, ts, 3) // ts
WriteInt(&b, len(data), 3) // message length
WriteInt(&b, typeid, 1) // message type id
WriteIntLE(&b, strid, 4) // message stream id
} else {
WriteInt(&b, 0x1<<6 + csid, 1) // fmt=1 csid
WriteInt(&b, ts, 3) // tsdelta
WriteInt(&b, len(data), 3) // message length
WriteInt(&b, typeid, 1) // message type id
}
} else {
WriteBuf(&b, []byte{0x3<<6 + byte(csid)}) // fmt=3, csid
}
size := 128
if len(data) - start < size {
size = len(data) - start
}
WriteBuf(&b, data[start:start+size])
WriteBuf(r.r, b.Bytes())
b.Reset()
start += size
}
l.Printf("Msg: csid %d ts %d paylen %d", csid, ts, len(data))
}
func (r *MsgStream) WriteAudio(strid, ts int, data []byte) {
d := append([]byte{0xaf, 1}, data...)
tsdelta := ts - r.ats
r.ats = ts
r.WriteMsg(1, 7, MSG_AUDIO, strid, tsdelta, d)
}
func (r *MsgStream) WriteAAC(strid, ts int, data []byte) {
d := append([]byte{0xaf, 0}, data...)
r.ats = ts
r.WriteMsg(0, 7, MSG_AUDIO, strid, ts, d)
}
func (r *MsgStream) WriteVideo(strid,ts int, key bool, data []byte) {
var b int
if key {
b = 0x17
} else {
b = 0x27
}
d := append([]byte{byte(b), 1, 0, 0, 0x50}, data...)
tsdelta := ts - r.vts
r.vts = ts
r.WriteMsg(1, 6, MSG_VIDEO, strid, tsdelta, d)
}
func (r *MsgStream) WritePPS(strid, ts int, data []byte) {
d := append([]byte{0x17, 0, 0, 0, 0}, data...)
r.vts = ts
r.WriteMsg(0, 6, MSG_VIDEO, strid, ts, d)
}
func (r *MsgStream) WriteAMFMeta(csid, strid int, a []AMFObj) {
var b bytes.Buffer
for _, v := range a {
WriteAMF(&b, v)
}
r.WriteMsg(0, csid, MSG_AMF_META, strid, 0, b.Bytes())
}
func (r *MsgStream) WriteAMFCmd(csid, strid int, a []AMFObj) {
var b bytes.Buffer
for _, v := range a {
WriteAMF(&b, v)
}
r.WriteMsg(0, csid, MSG_AMF_CMD, strid, 0, b.Bytes())
}
func (r *MsgStream) WriteMsg32(csid, typeid, strid, v int) {
var b bytes.Buffer
WriteInt(&b, v, 4)
r.WriteMsg(0, csid, typeid, strid, 0, b.Bytes())
}
func (r *MsgStream) ReadMsg() *Msg {
ch := readChunkHeader(r.r)
m, ok := r.Msg[ch.csid]
if !ok {
//l.Printf("chunk: new")
m = &Msg{ch, &bytes.Buffer{}, false, 0}
r.Msg[ch.csid] = m
}
switch ch.cfmt {
case 0:
m.ts = ch.ts
m.mlen = ch.mlen
m.typeid = ch.typeid
m.curts = m.ts
case 1:
m.tsdelta = ch.tsdelta
m.mlen = ch.mlen
m.typeid = ch.typeid
m.curts += m.tsdelta
case 2:
m.tsdelta = ch.tsdelta
}
left := m.mlen - m.data.Len()
size := 128
if size > left {
size = left
}
//l.Printf("chunk: %v", m)
if size > 0 {
io.CopyN(m.data, r.r, int64(size))
}
if size == left {
rm := new(Msg)
*rm = *m
l.Printf("event: fmt%d %v curts %d pre %v", ch.cfmt, m, m.curts, m.data.Bytes()[:9])
if m.typeid == MSG_VIDEO && int(m.data.Bytes()[0]) == 0x17 {
rm.key = true
} else {
rm.key = false
}
m.data = &bytes.Buffer{}
return rm
}
return nil
}

449
server.go Normal file
View File

@ -0,0 +1,449 @@
package rtmp
import (
"bytes"
"net"
"fmt"
"reflect"
"io/ioutil"
"os"
"bufio"
"log"
"time"
"strings"
)
var (
event = make(chan eventS, 0)
eventDone = make(chan int, 0)
)
type eventS struct {
id int
mr *MsgStream
m *Msg
}
type eventID int
func (e eventS) String() string {
switch e.id {
case E_NEW:
return "new"
case E_PUBLISH:
return "publish"
case E_PLAY:
return "play"
case E_DATA:
return fmt.Sprintf("data %d bytes ts %d", e.m.data.Len(), e.m.curts)
case E_CLOSE:
return "close"
}
return ""
}
/*
server:
connect
createStream
publish
client:
connect
createStream
getStreamLength
play
*/
const (
E_NEW = iota
E_PUBLISH
E_PLAY
E_DATA
E_CLOSE
)
func handleConnect(mr *MsgStream, trans float64, app string) {
l.Printf("stream %v: connect: %s", mr, app)
mr.app = app
mr.WriteMsg32(2, MSG_ACK_SIZE, 0, 5000000)
mr.WriteMsg32(2, MSG_BANDWIDTH, 0, 5000000)
mr.WriteMsg32(2, MSG_CHUNK_SIZE, 0, 128)
mr.WriteAMFCmd(3, 0, []AMFObj {
AMFObj { atype : AMF_STRING, str : "_result", },
AMFObj { atype : AMF_NUMBER, f64 : trans, },
AMFObj { atype : AMF_OBJECT,
obj : map[string] AMFObj {
"fmtVer" : AMFObj { atype : AMF_STRING, str : "FMS/3,0,1,123", },
"capabilities" : AMFObj { atype : AMF_NUMBER, f64 : 31, },
},
},
AMFObj { atype : AMF_OBJECT,
obj : map[string] AMFObj {
"level" : AMFObj { atype : AMF_STRING, str : "status", },
"code" : AMFObj { atype : AMF_STRING, str : "NetConnection.Connect.Success", },
"description" : AMFObj { atype : AMF_STRING, str : "Connection Success.", },
"objectEncoding" : AMFObj { atype : AMF_NUMBER, f64 : 0, },
},
},
})
}
func handleMeta(mr *MsgStream, obj AMFObj) {
mr.W = int(obj.obj["width"].f64)
mr.H = int(obj.obj["height"].f64)
l.Printf("stream %v: meta video %dx%d", mr, mr.W, mr.H)
}
func handleCreateStream(mr *MsgStream, trans float64) {
l.Printf("stream %v: createStream", mr)
mr.WriteAMFCmd(3, 0, []AMFObj {
AMFObj { atype : AMF_STRING, str : "_result", },
AMFObj { atype : AMF_NUMBER, f64 : trans, },
AMFObj { atype : AMF_NULL, },
AMFObj { atype : AMF_NUMBER, f64 : 1 },
})
}
func handleGetStreamLength(mr *MsgStream, trans float64) {
}
func handlePublish(mr *MsgStream) {
l.Printf("stream %v: publish", mr)
mr.WriteAMFCmd(3, 0, []AMFObj {
AMFObj { atype : AMF_STRING, str : "onStatus", },
AMFObj { atype : AMF_NUMBER, f64 : 0, },
AMFObj { atype : AMF_NULL, },
AMFObj { atype : AMF_OBJECT,
obj : map[string] AMFObj {
"level" : AMFObj { atype : AMF_STRING, str : "status", },
"code" : AMFObj { atype : AMF_STRING, str : "NetStream.Publish.Start", },
"description" : AMFObj { atype : AMF_STRING, str : "Start publising.", },
},
},
})
event <- eventS{id:E_PUBLISH, mr:mr}
<-eventDone
}
type testsrc struct {
r *bufio.Reader
dir string
w,h int
ts int
codec string
key bool
idx int
data []byte
}
func tsrcNew() (m *testsrc) {
m = &testsrc{}
m.dir = "/pixies/go/data/tmp"
fi, _ := os.Open(fmt.Sprintf("%s/index", m.dir))
m.r = bufio.NewReader(fi)
l, _ := m.r.ReadString('\n')
fmt.Sscanf(l, "%dx%d", &m.w, &m.h)
return
}
func (m *testsrc) fetch() (err error) {
l, err := m.r.ReadString('\n')
if err != nil {
return
}
a := strings.Split(l, ",")
fmt.Sscanf(a[0], "%d", &m.ts)
m.codec = a[1]
fmt.Sscanf(a[2], "%d", &m.idx)
switch m.codec {
case "h264":
fmt.Sscanf(a[3], "%t", &m.key)
m.data, err = ioutil.ReadFile(fmt.Sprintf("%s/h264/%d.264", m.dir, m.idx))
case "aac":
m.data, err = ioutil.ReadFile(fmt.Sprintf("%s/aac/%d.aac", m.dir, m.idx))
}
return
}
func handlePlay(mr *MsgStream, strid int) {
l.Printf("stream %v: play", mr)
var tsrc *testsrc
//tsrc = tsrcNew()
if tsrc == nil {
event <- eventS{id:E_PLAY, mr:mr}
<-eventDone
} else {
l.Printf("stream %v: test play data in %s", mr, tsrc.dir)
mr.W = tsrc.w
mr.H = tsrc.h
l.Printf("stream %v: test video %dx%d", mr, mr.W, mr.H)
}
var b bytes.Buffer
WriteInt(&b, 0, 2)
WriteInt(&b, strid, 4)
mr.WriteMsg(0, 2, MSG_USER, 0, 0, b.Bytes()) // stream begin 1
mr.WriteAMFCmd(5, strid, []AMFObj {
AMFObj { atype : AMF_STRING, str : "onStatus", },
AMFObj { atype : AMF_NUMBER, f64 : 0, },
AMFObj { atype : AMF_NULL, },
AMFObj { atype : AMF_OBJECT,
obj : map[string] AMFObj {
"level" : AMFObj { atype : AMF_STRING, str : "status", },
"code" : AMFObj { atype : AMF_STRING, str : "NetStream.Play.Start", },
"description" : AMFObj { atype : AMF_STRING, str : "Start live.", },
},
},
})
l.Printf("stream %v: video size %dx%d", mr, mr.W, mr.H)
mr.WriteAMFMeta(5, strid, []AMFObj {
AMFObj { atype : AMF_STRING, str : "|RtmpSampleAccess", },
AMFObj { atype : AMF_BOOLEAN, i: 1, },
AMFObj { atype : AMF_BOOLEAN, i: 1, },
})
mr.WriteAMFMeta(5, strid, []AMFObj {
AMFObj { atype : AMF_STRING, str : "onMetaData", },
AMFObj { atype : AMF_OBJECT,
obj : map[string] AMFObj {
"Server" : AMFObj { atype : AMF_STRING, str : "Golang Rtmp Server", },
"width" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), },
"height" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), },
"displayWidth" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), },
"displayHeight" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), },
"duration" : AMFObj { atype : AMF_NUMBER, f64 : 0, },
"framerate" : AMFObj { atype : AMF_NUMBER, f64 : 25000, },
"videodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 731, },
"videocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 7, },
"audiodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 122, },
"audiocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 10, },
},
},
})
if tsrc == nil {
l.Printf("stream %v: extra size %d %d", mr, len(mr.extraA), len(mr.extraV))
mr.WriteAAC(strid, 0, mr.extraA[2:])
mr.WritePPS(strid, 0, mr.extraV[5:])
l.Printf("stream %v: player wait data", mr)
for {
m := <-mr.que
l.Printf("data %v: got %v", mr, m)
switch m.typeid {
case MSG_AUDIO:
mr.WriteAudio(strid, m.curts, m.data.Bytes()[2:])
case MSG_VIDEO:
mr.WriteVideo(strid, m.curts, m.key, m.data.Bytes()[5:])
}
}
} else {
lf, _ := os.Create("/tmp/rtmp.log")
ll := log.New(lf, "", 0)
starttm := time.Now()
k := 0
for {
err := tsrc.fetch()
if err != nil {
panic(err)
}
switch tsrc.codec {
case "h264":
if tsrc.idx == 0 {
mr.WritePPS(strid, 0, tsrc.data)
} else {
mr.WriteVideo(strid, tsrc.ts, tsrc.key, tsrc.data)
}
case "aac":
if tsrc.idx == 0 {
mr.WriteAAC(strid, 0, tsrc.data)
} else {
mr.WriteAudio(strid, tsrc.ts, tsrc.data)
}
}
dur := time.Since(starttm).Nanoseconds()
diff := tsrc.ts - 1000 - int(dur/1000000)
if diff > 0 {
time.Sleep(time.Duration(diff)*time.Millisecond)
}
l.Printf("data %v: ts %v dur %v diff %v", mr, tsrc.ts, int(dur/1000000), diff)
ll.Printf("#%d %d,%s,%d %d", k, tsrc.ts, tsrc.codec, tsrc.idx, len(tsrc.data))
k++
}
}
}
func serve(mr *MsgStream) {
defer func() {
if err := recover(); err != nil {
event <- eventS{id:E_CLOSE, mr:mr}
<-eventDone
l.Printf("stream %v: closed %v", mr, err)
}
}()
handShake(mr.r)
// f, _ := os.Create("/tmp/pub.log")
// mr.l = log.New(f, "", 0)
for {
m := mr.ReadMsg()
if m == nil {
continue
}
//l.Printf("stream %v: msg %v", mr, m)
if m.typeid == MSG_AUDIO || m.typeid == MSG_VIDEO {
// mr.l.Printf("%d,%d", m.typeid, m.data.Len())
event <- eventS{id:E_DATA, mr:mr, m:m}
<-eventDone
}
if m.typeid == MSG_AMF_CMD || m.typeid == MSG_AMF_META {
a := ReadAMF(m.data)
//l.Printf("server: amfobj %v\n", a)
switch a.str {
case "connect":
a2 := ReadAMF(m.data)
a3 := ReadAMF(m.data)
if _, ok := a3.obj["app"]; !ok || a3.obj["app"].str == "" {
panic("connect: app not found")
}
handleConnect(mr, a2.f64, a3.obj["app"].str)
case "@setDataFrame":
ReadAMF(m.data)
a3 := ReadAMF(m.data)
handleMeta(mr, a3)
l.Printf("stream %v: setdataframe", mr)
case "createStream":
a2 := ReadAMF(m.data)
handleCreateStream(mr, a2.f64)
case "publish":
handlePublish(mr)
case "play":
handlePlay(mr, m.strid)
}
}
}
}
func listenEvent() {
idmap := map[string]*MsgStream{}
pubmap := map[string]*MsgStream{}
for {
e := <-event
if e.id == E_DATA {
l.Printf("data %v: %v", e.mr, e)
} else {
l.Printf("event %v: %v", e.mr, e)
}
switch {
case e.id == E_NEW:
idmap[e.mr.id] = e.mr
case e.id == E_PUBLISH:
if _, ok := pubmap[e.mr.app]; ok {
l.Printf("event %v: duplicated publish with %v app %s", e.mr, pubmap[e.mr.app], e.mr.app)
e.mr.Close()
} else {
e.mr.role = PUBLISHER
pubmap[e.mr.app] = e.mr
}
case e.id == E_PLAY:
src, ok := pubmap[e.mr.app]
if !ok || src.stat != WAIT_DATA {
l.Printf("event %v: cannot find publisher with app %s", e.mr, e.mr.app)
e.mr.Close()
} else {
e.mr.W = src.W
e.mr.H = src.H
e.mr.role = PLAYER
e.mr.extraA = src.extraA
e.mr.extraV = src.extraV
e.mr.que = make(chan *Msg, 16)
}
case e.id == E_CLOSE:
if e.mr.role == PUBLISHER {
delete(pubmap, e.mr.app)
}
delete(idmap, e.mr.id)
case e.id == E_DATA && e.mr.stat == WAIT_EXTRA:
if len(e.mr.extraA) == 0 && e.m.typeid == MSG_AUDIO {
l.Printf("event %v: got aac config", e.mr)
e.mr.extraA = e.m.data.Bytes()
}
if len(e.mr.extraV) == 0 && e.m.typeid == MSG_VIDEO {
l.Printf("event %v: got pps", e.mr)
e.mr.extraV = e.m.data.Bytes()
}
if len(e.mr.extraA) > 0 && len(e.mr.extraV) > 0 {
l.Printf("event %v: got all extra", e.mr)
e.mr.stat = WAIT_DATA
}
case e.id == E_DATA && e.mr.stat == WAIT_DATA:
for _, mr := range idmap {
if mr.role == PLAYER && mr.app == e.mr.app {
ch := reflect.ValueOf(mr.que)
ok := ch.TrySend(reflect.ValueOf(e.m))
if !ok {
l.Printf("event %v: send failed", e.mr)
} else {
l.Printf("event %v: send ok", e.mr)
}
}
}
}
eventDone <- 1
}
}
func SimpleServer() {
l.Printf("server: simple server starts")
ln, err := net.Listen("tcp", ":1935")
if err != nil {
l.Printf("server: error: listen 1935 %s\n", err)
return
}
go listenEvent()
for {
c, err := ln.Accept()
if err != nil {
l.Printf("server: error: sock accept %s\n", err)
break
}
go func (c net.Conn) {
mr := NewMsgStream(c)
event <- eventS{id:E_NEW, mr:mr}
<-eventDone
serve(mr)
} (c)
}
}

113
util.go Normal file
View File

@ -0,0 +1,113 @@
package rtmp
import (
"io"
"log"
"os"
"fmt"
"strings"
)
type dummyWriter struct {
}
func (m *dummyWriter) Write(p []byte) (n int, err error) {
return 0, nil
}
type logger int
func (l logger) Printf(format string, v ...interface{}) {
str := fmt.Sprintf(format, v...)
switch {
case strings.HasPrefix(str, "server") && l >= 0,
strings.HasPrefix(str, "stream") && l >= 0,
strings.HasPrefix(str, "event") && l >= 0,
strings.HasPrefix(str, "data") && l >= 0,
strings.HasPrefix(str, "msg") && l >= 1:
l2.Println(str)
}
}
var (
dummyW = &dummyWriter{}
l = logger(0)
l2 *log.Logger
)
func init() {
l2 = log.New(os.Stderr, "", 0)
l2.SetFlags(log.Lmicroseconds)
}
type stream struct {
r io.ReadWriteCloser
}
func (s stream) Read(p []byte) (n int, err error) {
n, err = s.r.Read(p)
if err != nil {
panic(err)
}
return
}
func (s stream) Write(p []byte) (n int, err error) {
n, err = s.r.Write(p)
if err != nil {
panic(err)
}
return
}
func (s stream) Close() {
s.r.Close()
}
func ReadBuf(r io.Reader, n int) (b []byte) {
b = make([]byte, n)
r.Read(b)
return
}
func ReadInt(r io.Reader, n int) (ret int) {
b := ReadBuf(r, n)
for i := 0; i < n; i++ {
ret <<= 8
ret += int(b[i])
}
return
}
func ReadIntLE(r io.Reader, n int) (ret int) {
b := ReadBuf(r, n)
for i := 0; i < n; i++ {
ret <<= 8
ret += int(b[n-i-1])
}
return
}
func WriteBuf(w io.Writer, buf []byte) {
w.Write(buf)
}
func WriteInt(w io.Writer, v int, n int) {
b := make([]byte, n)
for i := 0; i < n; i++ {
b[n-i-1] = byte(v&0xff)
v >>= 8
}
WriteBuf(w, b)
}
func WriteIntLE(w io.Writer, v int, n int) {
b := make([]byte, n)
for i := 0; i < n; i++ {
b[i] = byte(v&0xff)
v >>= 8
}
WriteBuf(w, b)
}