commit fc3c9d7af380d06b2fccc8ea85c53ec074842df5 Author: cfanfrank Date: Sun Mar 17 23:41:45 2013 +0800 first diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a01ee28 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.*.swp diff --git a/README.md b/README.md new file mode 100644 index 0000000..3049b0c --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +Rtmp + +Golang rtmp server + +Run a simple server + + package main + + import "github.com/go-av/rtmp" + + func main() { + rtmp.SimpleServer() + } + +Use avconv to publish stream + + avconv -re -i a.mp4 -c:a copy -c:v copy -f flv rtmp://localhost/myapp/1 + +Use avplay to play stream + + avplay rtmp://localhost/myapp/1 + diff --git a/amf.go b/amf.go new file mode 100644 index 0000000..cd465ba --- /dev/null +++ b/amf.go @@ -0,0 +1,100 @@ + +package rtmp + +import ( + "io" + "encoding/binary" +) + +var ( + AMF_NUMBER = 0x00 + AMF_BOOLEAN = 0x01 + AMF_STRING = 0x02 + AMF_OBJECT = 0x03 + AMF_NULL = 0x05 + AMF_ARRAY_NULL = 0x06 + AMF_MIXED_ARRAY = 0x08 + AMF_END = 0x09 + AMF_ARRAY = 0x0a + + AMF_INT8 = 0x0100 + AMF_INT16 = 0x0101 + AMF_INT32 = 0x0102 + AMF_VARIANT_ = 0x0103 +) + +type AMFObj struct { + atype int + str string + i int + buf []byte + obj map[string]AMFObj + f64 float64 +} + +func ReadAMF(r io.Reader) (a AMFObj) { + a.atype = ReadInt(r, 1) + switch (a.atype) { + case AMF_STRING: + n := ReadInt(r, 2) + b := ReadBuf(r, n) + a.str = string(b) + case AMF_NUMBER: + binary.Read(r, binary.BigEndian, &a.f64) + case AMF_BOOLEAN: + a.i = ReadInt(r, 1) + case AMF_MIXED_ARRAY: + ReadInt(r, 4) + fallthrough + case AMF_OBJECT: + a.obj = map[string]AMFObj{} + for { + n := ReadInt(r, 2) + if n == 0 { + break + } + name := string(ReadBuf(r, n)) + a.obj[name] = ReadAMF(r) + } + case AMF_ARRAY, AMF_VARIANT_: + panic("amf: read: unsupported array or variant") + case AMF_INT8: + a.i = ReadInt(r, 1) + case AMF_INT16: + a.i = ReadInt(r, 2) + case AMF_INT32: + a.i = ReadInt(r, 4) + } + return +} + +func WriteAMF(r io.Writer, a AMFObj) { + WriteInt(r, a.atype, 1) + switch (a.atype) { + case AMF_STRING: + WriteInt(r, len(a.str), 2) + r.Write([]byte(a.str)) + case AMF_NUMBER: + binary.Write(r, binary.BigEndian, a.f64) + case AMF_BOOLEAN: + WriteInt(r, a.i, 1) + case AMF_MIXED_ARRAY: + r.Write(a.buf[:4]) + case AMF_OBJECT: + for name, val := range a.obj { + WriteInt(r, len(name), 2) + r.Write([]byte(name)) + WriteAMF(r, val) + } + WriteInt(r, 9, 3) + case AMF_ARRAY, AMF_VARIANT_: + panic("amf: write unsupported array, var") + case AMF_INT8: + WriteInt(r, a.i, 1) + case AMF_INT16: + WriteInt(r, a.i, 2) + case AMF_INT32: + WriteInt(r, a.i, 4) + } +} + diff --git a/amf_test.go b/amf_test.go new file mode 100644 index 0000000..57b59d3 --- /dev/null +++ b/amf_test.go @@ -0,0 +1,21 @@ + +package rtmp + +import ( + "testing" + "encoding/base64" + "bytes" + "fmt" +) + +var ( + data = `AgAHY29ubmVjdAA/8AAAAAAAAAMAA2FwcAIABW15YXBwAAhmbGFzaFZlcgIAEE1BQyAxMSw1LDUwMiwxNDkABnN3ZlVybAIAJmh0dHA6Ly9sb2NhbGhvc3Q6ODA4MS9zd2YvandwbGF5ZXIuc3dmAAV0Y1VybAIAFnJ0bXA6Ly9sb2NhbGhvc3QvbXlhcHAABGZwYWQBAAAMY2FwYWJpbGl0aWVzAEBt4AAAAAAAAAthdWRpb0NvZGVjcwBAq+4AAAAAAAALdmlkZW9Db2RlY3MAQG+AAAAAAAAADXZpZGVvRnVuY3Rpb24AP/AAAAAAAAAAB3BhZ2VVcmwCABpodHRwOi8vbG9jYWxob3N0OjgwODEvc3dmLwAOb2JqZWN0RW5jb2RpbmcAAAAAAAAAAAAAAAk=` +) + +func TestHal(t *testing.T) { + dec := base64.NewDecoder(base64.StdEncoding, bytes.NewBufferString(data)) + r := NewAMFReader(dec) + obj := r.ReadAMF() + fmt.Printf("%v\n", obj) +} + diff --git a/handshake.go b/handshake.go new file mode 100644 index 0000000..d6ae30c --- /dev/null +++ b/handshake.go @@ -0,0 +1,145 @@ + +package rtmp + +import ( + "io" + "crypto/hmac" + "crypto/sha256" + "bytes" + "math/rand" +) + +var ( + clientKey = []byte{ + 'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ', + 'F', 'l', 'a', 's', 'h', ' ', 'P', 'l', 'a', 'y', 'e', 'r', ' ', + '0', '0', '1', + + 0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, + 0x02, 0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, + 0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE, + } + serverKey = []byte{ + 'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ', + 'F', 'l', 'a', 's', 'h', ' ', 'M', 'e', 'd', 'i', 'a', ' ', + 'S', 'e', 'r', 'v', 'e', 'r', ' ', + '0', '0', '1', + + 0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, + 0x02, 0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, + 0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE, + } + clientKey2 = clientKey[:30] + serverKey2 = serverKey[:36] + serverVersion = []byte{ + 0x0D, 0x0E, 0x0A, 0x0D, + } +) + +func makeDigest(key []byte, src []byte, skip int) (dst []byte) { + h := hmac.New(sha256.New, key) + if skip >= 0 && skip < len(src) { + if skip != 0 { + h.Write(src[:skip]) + } + if len(src) != skip + 32 { + h.Write(src[skip+32:]) + } + } else { + h.Write(src) + } + return h.Sum(nil) +} + +func findDigest(b []byte, key []byte, base int) (int) { + offs := 0 + for n := 0; n < 4; n++ { + offs += int(b[base + n]) + } + offs = (offs % 728) + base + 4 +// fmt.Printf("offs %v\n", offs) + dig := makeDigest(key, b, offs) +// fmt.Printf("digest %v\n", digest) +// fmt.Printf("p %v\n", b[offs:offs+32]) + if bytes.Compare(b[offs:offs+32], dig) != 0 { + offs = -1 + } + return offs +} + +func writeDigest(b []byte, key []byte, base int) { + offs := 0 + for n := 8; n < 12; n++ { + offs += int(b[base + n]) + } + offs = (offs % 728) + base + 12 + + dig := makeDigest(key, b, offs) + copy(b[offs:], dig) +} + +func createChal(b []byte, ver []byte, key []byte) { + b[0] = 3 + copy(b[5:9], ver) + for i := 9; i < 1537; i++ { + b[i] = byte(rand.Int() % 256) + } + writeDigest(b[1:], key, 0) +} + +func createResp(b []byte, key []byte) { + for i := 0; i < 1536; i++ { + b[i] = byte(rand.Int() % 256) + } + dig := makeDigest(key, b, 1536-32) + copy(b[1536-32:], dig) +} + +func parseChal(b []byte, peerKey []byte, key []byte) (dig []byte, err int) { + if b[0] != 0x3 { + l.Printf("handshake: invalid rtmp version\n") + err = 1 + return + } + + epoch := b[1:5] + ver := b[5:9] + l.Printf("handshake: epoch %v ver %v\n", epoch, ver) + + var offs int + if offs = findDigest(b[1:], peerKey, 772); offs == -1 { + if offs = findDigest(b[1:], peerKey, 8); offs == -1 { + l.Printf("handshake: digest not found\n") + err = 1 + return + } + } + + l.Printf("handshake: offs = %v\n", offs) + + dig = makeDigest(key, b[1+offs:1+offs+32], -1) + return +} + + +func handShake(rw io.ReadWriter) { + b := ReadBuf(rw, 1537) + l.Printf("handshake: got client chal\n") + dig, err := parseChal(b, clientKey2, serverKey) + if err != 0 { + return + } + + createChal(b, serverVersion, serverKey2) + l.Printf("handshake: send server chal\n") + rw.Write(b) + + b = make([]byte, 1536) + createResp(b, dig) + l.Printf("handshake: send server resp\n") + rw.Write(b) + + b = ReadBuf(rw, 1536) + l.Printf("handshake: got client resp\n") +} + diff --git a/msg.go b/msg.go new file mode 100644 index 0000000..5e8a457 --- /dev/null +++ b/msg.go @@ -0,0 +1,299 @@ + +package rtmp + +import ( + "io" + "bytes" + "fmt" + "log" +) + +var ( + MSG_CHUNK_SIZE = 1 + MSG_ABORT = 2 + MSG_ACK = 3 + MSG_USER = 4 + MSG_ACK_SIZE = 5 + MSG_BANDWIDTH = 6 + MSG_EDGE = 7 + MSG_AUDIO = 8 + MSG_VIDEO = 9 + MSG_AMF3_META = 15 + MSG_AMF3_SHARED = 16 + MSG_AMF3_CMD = 17 + MSG_AMF_META = 18 + MSG_AMF_SHARED = 19 + MSG_AMF_CMD = 20 + MSG_AGGREGATE = 22 + MSG_MAX = 22 +) + +var ( + MsgTypeStr = []string { + "?", + "CHUNK_SIZE", "ABORT", "ACK", + "USER", "ACK_SIZE", "BANDWIDTH", "EDGE", + "AUDIO", "VIDEO", + "AMF3_META", "AMF3_SHARED", "AFM3_CMD", + "AMF_META", "AMF_SHARED", "AMF_CMD", + "AGGREGATE", + } +) + +type chunkHeader struct { + typeid int + mlen int + csid int + cfmt int + ts int + tsdelta int + strid int +} + +func readChunkHeader (r io.Reader) (m chunkHeader) { + i := ReadInt(r, 1) + m.cfmt = (i>>6)&3; + m.csid = i&0x3f; + + if m.csid == 0 { + j := ReadInt(r, 1) + m.csid = j + 64 + } + + if m.csid == 0x3f { + j := ReadInt(r, 2) + m.csid = j + 64 + } + + if m.cfmt == 0 { + m.ts = ReadInt(r, 3) + m.mlen = ReadInt(r, 3) + m.typeid = ReadInt(r, 1) + m.strid = ReadIntLE(r, 4) + } + + if m.cfmt == 1 { + m.tsdelta = ReadInt(r, 3) + m.mlen = ReadInt(r, 3) + m.typeid = ReadInt(r, 1) + } + + if m.cfmt == 2 { + m.tsdelta = ReadInt(r, 3) + } + + if m.ts == 0xffffff { + m.ts = ReadInt(r, 4) + } + if m.tsdelta == 0xffffff { + m.tsdelta = ReadInt(r, 4) + } + + //l.Printf("chunk: %v", m) + + return +} + +const ( + UNKNOWN = 0 + PLAYER = 1 + PUBLISHER = 2 +) + +const ( + WAIT_EXTRA = 0 + WAIT_DATA = 1 +) + + +type MsgStream struct { + r stream + Msg map[int]*Msg + vts, ats int + + id string + role int + stat int + app string + W,H int + strid int + extraA, extraV []byte + que chan *Msg + l *log.Logger +} + +type Msg struct { + chunkHeader + data *bytes.Buffer + + key bool + curts int +} + +func (m *Msg) String() string { + var typestr string + if m.typeid < len(MsgTypeStr) { + typestr = MsgTypeStr[m.typeid] + } else { + typestr = "?" + } + return fmt.Sprintf("%s %d %v", typestr, m.mlen, m.chunkHeader) +} + +var ( + mrseq = 0 +) + +func NewMsgStream(r io.ReadWriteCloser) *MsgStream { + mrseq++ + return &MsgStream{ + r:stream{r}, + Msg:map[int]*Msg{}, + id:fmt.Sprintf("#%d", mrseq), + } +} + +func (mr *MsgStream) String() string { + return mr.id +} + +func (mr *MsgStream) Close() { + mr.r.Close() +} + +func (r *MsgStream) WriteMsg(cfmt, csid, typeid, strid, ts int, data []byte) { + var b bytes.Buffer + start := 0 + for i := 0; start < len(data); i++ { + if i == 0 { + if cfmt == 0 { + WriteInt(&b, csid, 1) // fmt=0 csid + WriteInt(&b, ts, 3) // ts + WriteInt(&b, len(data), 3) // message length + WriteInt(&b, typeid, 1) // message type id + WriteIntLE(&b, strid, 4) // message stream id + } else { + WriteInt(&b, 0x1<<6 + csid, 1) // fmt=1 csid + WriteInt(&b, ts, 3) // tsdelta + WriteInt(&b, len(data), 3) // message length + WriteInt(&b, typeid, 1) // message type id + } + } else { + WriteBuf(&b, []byte{0x3<<6 + byte(csid)}) // fmt=3, csid + } + size := 128 + if len(data) - start < size { + size = len(data) - start + } + WriteBuf(&b, data[start:start+size]) + WriteBuf(r.r, b.Bytes()) + b.Reset() + start += size + } + l.Printf("Msg: csid %d ts %d paylen %d", csid, ts, len(data)) +} + +func (r *MsgStream) WriteAudio(strid, ts int, data []byte) { + d := append([]byte{0xaf, 1}, data...) + tsdelta := ts - r.ats + r.ats = ts + r.WriteMsg(1, 7, MSG_AUDIO, strid, tsdelta, d) +} + +func (r *MsgStream) WriteAAC(strid, ts int, data []byte) { + d := append([]byte{0xaf, 0}, data...) + r.ats = ts + r.WriteMsg(0, 7, MSG_AUDIO, strid, ts, d) +} + +func (r *MsgStream) WriteVideo(strid,ts int, key bool, data []byte) { + var b int + if key { + b = 0x17 + } else { + b = 0x27 + } + d := append([]byte{byte(b), 1, 0, 0, 0x50}, data...) + tsdelta := ts - r.vts + r.vts = ts + r.WriteMsg(1, 6, MSG_VIDEO, strid, tsdelta, d) +} + +func (r *MsgStream) WritePPS(strid, ts int, data []byte) { + d := append([]byte{0x17, 0, 0, 0, 0}, data...) + r.vts = ts + r.WriteMsg(0, 6, MSG_VIDEO, strid, ts, d) +} + +func (r *MsgStream) WriteAMFMeta(csid, strid int, a []AMFObj) { + var b bytes.Buffer + for _, v := range a { + WriteAMF(&b, v) + } + r.WriteMsg(0, csid, MSG_AMF_META, strid, 0, b.Bytes()) +} + +func (r *MsgStream) WriteAMFCmd(csid, strid int, a []AMFObj) { + var b bytes.Buffer + for _, v := range a { + WriteAMF(&b, v) + } + r.WriteMsg(0, csid, MSG_AMF_CMD, strid, 0, b.Bytes()) +} + +func (r *MsgStream) WriteMsg32(csid, typeid, strid, v int) { + var b bytes.Buffer + WriteInt(&b, v, 4) + r.WriteMsg(0, csid, typeid, strid, 0, b.Bytes()) +} + +func (r *MsgStream) ReadMsg() *Msg { + ch := readChunkHeader(r.r) + m, ok := r.Msg[ch.csid] + if !ok { + //l.Printf("chunk: new") + m = &Msg{ch, &bytes.Buffer{}, false, 0} + r.Msg[ch.csid] = m + } + + switch ch.cfmt { + case 0: + m.ts = ch.ts + m.mlen = ch.mlen + m.typeid = ch.typeid + m.curts = m.ts + case 1: + m.tsdelta = ch.tsdelta + m.mlen = ch.mlen + m.typeid = ch.typeid + m.curts += m.tsdelta + case 2: + m.tsdelta = ch.tsdelta + } + + left := m.mlen - m.data.Len() + size := 128 + if size > left { + size = left + } + //l.Printf("chunk: %v", m) + if size > 0 { + io.CopyN(m.data, r.r, int64(size)) + } + + if size == left { + rm := new(Msg) + *rm = *m + l.Printf("event: fmt%d %v curts %d pre %v", ch.cfmt, m, m.curts, m.data.Bytes()[:9]) + if m.typeid == MSG_VIDEO && int(m.data.Bytes()[0]) == 0x17 { + rm.key = true + } else { + rm.key = false + } + m.data = &bytes.Buffer{} + return rm + } + + return nil +} + diff --git a/server.go b/server.go new file mode 100644 index 0000000..f77c645 --- /dev/null +++ b/server.go @@ -0,0 +1,449 @@ + +package rtmp + +import ( + "bytes" + "net" + "fmt" + "reflect" + "io/ioutil" + "os" + "bufio" + "log" + "time" + "strings" +) + +var ( + event = make(chan eventS, 0) + eventDone = make(chan int, 0) +) + +type eventS struct { + id int + mr *MsgStream + m *Msg +} + +type eventID int + +func (e eventS) String() string { + switch e.id { + case E_NEW: + return "new" + case E_PUBLISH: + return "publish" + case E_PLAY: + return "play" + case E_DATA: + return fmt.Sprintf("data %d bytes ts %d", e.m.data.Len(), e.m.curts) + case E_CLOSE: + return "close" + } + return "" +} + +/* +server: + connect + createStream + publish +client: + connect + createStream + getStreamLength + play +*/ + +const ( + E_NEW = iota + E_PUBLISH + E_PLAY + E_DATA + E_CLOSE +) + + +func handleConnect(mr *MsgStream, trans float64, app string) { + + l.Printf("stream %v: connect: %s", mr, app) + + mr.app = app + + mr.WriteMsg32(2, MSG_ACK_SIZE, 0, 5000000) + mr.WriteMsg32(2, MSG_BANDWIDTH, 0, 5000000) + mr.WriteMsg32(2, MSG_CHUNK_SIZE, 0, 128) + + mr.WriteAMFCmd(3, 0, []AMFObj { + AMFObj { atype : AMF_STRING, str : "_result", }, + AMFObj { atype : AMF_NUMBER, f64 : trans, }, + AMFObj { atype : AMF_OBJECT, + obj : map[string] AMFObj { + "fmtVer" : AMFObj { atype : AMF_STRING, str : "FMS/3,0,1,123", }, + "capabilities" : AMFObj { atype : AMF_NUMBER, f64 : 31, }, + }, + }, + AMFObj { atype : AMF_OBJECT, + obj : map[string] AMFObj { + "level" : AMFObj { atype : AMF_STRING, str : "status", }, + "code" : AMFObj { atype : AMF_STRING, str : "NetConnection.Connect.Success", }, + "description" : AMFObj { atype : AMF_STRING, str : "Connection Success.", }, + "objectEncoding" : AMFObj { atype : AMF_NUMBER, f64 : 0, }, + }, + }, + }) +} + +func handleMeta(mr *MsgStream, obj AMFObj) { + + mr.W = int(obj.obj["width"].f64) + mr.H = int(obj.obj["height"].f64) + + l.Printf("stream %v: meta video %dx%d", mr, mr.W, mr.H) +} + +func handleCreateStream(mr *MsgStream, trans float64) { + + l.Printf("stream %v: createStream", mr) + + mr.WriteAMFCmd(3, 0, []AMFObj { + AMFObj { atype : AMF_STRING, str : "_result", }, + AMFObj { atype : AMF_NUMBER, f64 : trans, }, + AMFObj { atype : AMF_NULL, }, + AMFObj { atype : AMF_NUMBER, f64 : 1 }, + }) +} + +func handleGetStreamLength(mr *MsgStream, trans float64) { +} + +func handlePublish(mr *MsgStream) { + + l.Printf("stream %v: publish", mr) + + mr.WriteAMFCmd(3, 0, []AMFObj { + AMFObj { atype : AMF_STRING, str : "onStatus", }, + AMFObj { atype : AMF_NUMBER, f64 : 0, }, + AMFObj { atype : AMF_NULL, }, + AMFObj { atype : AMF_OBJECT, + obj : map[string] AMFObj { + "level" : AMFObj { atype : AMF_STRING, str : "status", }, + "code" : AMFObj { atype : AMF_STRING, str : "NetStream.Publish.Start", }, + "description" : AMFObj { atype : AMF_STRING, str : "Start publising.", }, + }, + }, + }) + + event <- eventS{id:E_PUBLISH, mr:mr} + <-eventDone +} + +type testsrc struct { + r *bufio.Reader + dir string + w,h int + ts int + codec string + key bool + idx int + data []byte +} + +func tsrcNew() (m *testsrc) { + m = &testsrc{} + m.dir = "/pixies/go/data/tmp" + fi, _ := os.Open(fmt.Sprintf("%s/index", m.dir)) + m.r = bufio.NewReader(fi) + l, _ := m.r.ReadString('\n') + fmt.Sscanf(l, "%dx%d", &m.w, &m.h) + return +} + +func (m *testsrc) fetch() (err error) { + l, err := m.r.ReadString('\n') + if err != nil { + return + } + a := strings.Split(l, ",") + fmt.Sscanf(a[0], "%d", &m.ts) + m.codec = a[1] + fmt.Sscanf(a[2], "%d", &m.idx) + switch m.codec { + case "h264": + fmt.Sscanf(a[3], "%t", &m.key) + m.data, err = ioutil.ReadFile(fmt.Sprintf("%s/h264/%d.264", m.dir, m.idx)) + case "aac": + m.data, err = ioutil.ReadFile(fmt.Sprintf("%s/aac/%d.aac", m.dir, m.idx)) + } + return +} + +func handlePlay(mr *MsgStream, strid int) { + + l.Printf("stream %v: play", mr) + + var tsrc *testsrc + //tsrc = tsrcNew() + + if tsrc == nil { + event <- eventS{id:E_PLAY, mr:mr} + <-eventDone + } else { + l.Printf("stream %v: test play data in %s", mr, tsrc.dir) + mr.W = tsrc.w + mr.H = tsrc.h + l.Printf("stream %v: test video %dx%d", mr, mr.W, mr.H) + } + + var b bytes.Buffer + WriteInt(&b, 0, 2) + WriteInt(&b, strid, 4) + mr.WriteMsg(0, 2, MSG_USER, 0, 0, b.Bytes()) // stream begin 1 + + mr.WriteAMFCmd(5, strid, []AMFObj { + AMFObj { atype : AMF_STRING, str : "onStatus", }, + AMFObj { atype : AMF_NUMBER, f64 : 0, }, + AMFObj { atype : AMF_NULL, }, + AMFObj { atype : AMF_OBJECT, + obj : map[string] AMFObj { + "level" : AMFObj { atype : AMF_STRING, str : "status", }, + "code" : AMFObj { atype : AMF_STRING, str : "NetStream.Play.Start", }, + "description" : AMFObj { atype : AMF_STRING, str : "Start live.", }, + }, + }, + }) + + l.Printf("stream %v: video size %dx%d", mr, mr.W, mr.H) + + mr.WriteAMFMeta(5, strid, []AMFObj { + AMFObj { atype : AMF_STRING, str : "|RtmpSampleAccess", }, + AMFObj { atype : AMF_BOOLEAN, i: 1, }, + AMFObj { atype : AMF_BOOLEAN, i: 1, }, + }) + + mr.WriteAMFMeta(5, strid, []AMFObj { + AMFObj { atype : AMF_STRING, str : "onMetaData", }, + AMFObj { atype : AMF_OBJECT, + obj : map[string] AMFObj { + "Server" : AMFObj { atype : AMF_STRING, str : "Golang Rtmp Server", }, + "width" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), }, + "height" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), }, + "displayWidth" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), }, + "displayHeight" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), }, + "duration" : AMFObj { atype : AMF_NUMBER, f64 : 0, }, + "framerate" : AMFObj { atype : AMF_NUMBER, f64 : 25000, }, + "videodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 731, }, + "videocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 7, }, + "audiodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 122, }, + "audiocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 10, }, + }, + }, + }) + + if tsrc == nil { + l.Printf("stream %v: extra size %d %d", mr, len(mr.extraA), len(mr.extraV)) + + mr.WriteAAC(strid, 0, mr.extraA[2:]) + mr.WritePPS(strid, 0, mr.extraV[5:]) + + l.Printf("stream %v: player wait data", mr) + + for { + m := <-mr.que + l.Printf("data %v: got %v", mr, m) + switch m.typeid { + case MSG_AUDIO: + mr.WriteAudio(strid, m.curts, m.data.Bytes()[2:]) + case MSG_VIDEO: + mr.WriteVideo(strid, m.curts, m.key, m.data.Bytes()[5:]) + } + } + } else { + + lf, _ := os.Create("/tmp/rtmp.log") + ll := log.New(lf, "", 0) + + starttm := time.Now() + k := 0 + + for { + err := tsrc.fetch() + if err != nil { + panic(err) + } + switch tsrc.codec { + case "h264": + if tsrc.idx == 0 { + mr.WritePPS(strid, 0, tsrc.data) + } else { + mr.WriteVideo(strid, tsrc.ts, tsrc.key, tsrc.data) + } + case "aac": + if tsrc.idx == 0 { + mr.WriteAAC(strid, 0, tsrc.data) + } else { + mr.WriteAudio(strid, tsrc.ts, tsrc.data) + } + } + dur := time.Since(starttm).Nanoseconds() + diff := tsrc.ts - 1000 - int(dur/1000000) + if diff > 0 { + time.Sleep(time.Duration(diff)*time.Millisecond) + } + l.Printf("data %v: ts %v dur %v diff %v", mr, tsrc.ts, int(dur/1000000), diff) + ll.Printf("#%d %d,%s,%d %d", k, tsrc.ts, tsrc.codec, tsrc.idx, len(tsrc.data)) + k++ + } + } +} + +func serve(mr *MsgStream) { + + defer func() { + if err := recover(); err != nil { + event <- eventS{id:E_CLOSE, mr:mr} + <-eventDone + l.Printf("stream %v: closed %v", mr, err) + } + }() + + handShake(mr.r) + +// f, _ := os.Create("/tmp/pub.log") +// mr.l = log.New(f, "", 0) + + for { + m := mr.ReadMsg() + if m == nil { + continue + } + + //l.Printf("stream %v: msg %v", mr, m) + + if m.typeid == MSG_AUDIO || m.typeid == MSG_VIDEO { +// mr.l.Printf("%d,%d", m.typeid, m.data.Len()) + event <- eventS{id:E_DATA, mr:mr, m:m} + <-eventDone + } + + if m.typeid == MSG_AMF_CMD || m.typeid == MSG_AMF_META { + a := ReadAMF(m.data) + //l.Printf("server: amfobj %v\n", a) + switch a.str { + case "connect": + a2 := ReadAMF(m.data) + a3 := ReadAMF(m.data) + if _, ok := a3.obj["app"]; !ok || a3.obj["app"].str == "" { + panic("connect: app not found") + } + handleConnect(mr, a2.f64, a3.obj["app"].str) + case "@setDataFrame": + ReadAMF(m.data) + a3 := ReadAMF(m.data) + handleMeta(mr, a3) + l.Printf("stream %v: setdataframe", mr) + case "createStream": + a2 := ReadAMF(m.data) + handleCreateStream(mr, a2.f64) + case "publish": + handlePublish(mr) + case "play": + handlePlay(mr, m.strid) + } + } + } +} + +func listenEvent() { + idmap := map[string]*MsgStream{} + pubmap := map[string]*MsgStream{} + + for { + e := <-event + if e.id == E_DATA { + l.Printf("data %v: %v", e.mr, e) + } else { + l.Printf("event %v: %v", e.mr, e) + } + switch { + case e.id == E_NEW: + idmap[e.mr.id] = e.mr + case e.id == E_PUBLISH: + if _, ok := pubmap[e.mr.app]; ok { + l.Printf("event %v: duplicated publish with %v app %s", e.mr, pubmap[e.mr.app], e.mr.app) + e.mr.Close() + } else { + e.mr.role = PUBLISHER + pubmap[e.mr.app] = e.mr + } + case e.id == E_PLAY: + src, ok := pubmap[e.mr.app] + if !ok || src.stat != WAIT_DATA { + l.Printf("event %v: cannot find publisher with app %s", e.mr, e.mr.app) + e.mr.Close() + } else { + e.mr.W = src.W + e.mr.H = src.H + e.mr.role = PLAYER + e.mr.extraA = src.extraA + e.mr.extraV = src.extraV + e.mr.que = make(chan *Msg, 16) + } + case e.id == E_CLOSE: + if e.mr.role == PUBLISHER { + delete(pubmap, e.mr.app) + } + delete(idmap, e.mr.id) + case e.id == E_DATA && e.mr.stat == WAIT_EXTRA: + if len(e.mr.extraA) == 0 && e.m.typeid == MSG_AUDIO { + l.Printf("event %v: got aac config", e.mr) + e.mr.extraA = e.m.data.Bytes() + } + if len(e.mr.extraV) == 0 && e.m.typeid == MSG_VIDEO { + l.Printf("event %v: got pps", e.mr) + e.mr.extraV = e.m.data.Bytes() + } + if len(e.mr.extraA) > 0 && len(e.mr.extraV) > 0 { + l.Printf("event %v: got all extra", e.mr) + e.mr.stat = WAIT_DATA + } + case e.id == E_DATA && e.mr.stat == WAIT_DATA: + for _, mr := range idmap { + if mr.role == PLAYER && mr.app == e.mr.app { + ch := reflect.ValueOf(mr.que) + ok := ch.TrySend(reflect.ValueOf(e.m)) + if !ok { + l.Printf("event %v: send failed", e.mr) + } else { + l.Printf("event %v: send ok", e.mr) + } + } + } + } + eventDone <- 1 + } +} + +func SimpleServer() { + l.Printf("server: simple server starts") + ln, err := net.Listen("tcp", ":1935") + if err != nil { + l.Printf("server: error: listen 1935 %s\n", err) + return + } + go listenEvent() + for { + c, err := ln.Accept() + if err != nil { + l.Printf("server: error: sock accept %s\n", err) + break + } + go func (c net.Conn) { + mr := NewMsgStream(c) + event <- eventS{id:E_NEW, mr:mr} + <-eventDone + serve(mr) + } (c) + } +} + diff --git a/util.go b/util.go new file mode 100644 index 0000000..0787fca --- /dev/null +++ b/util.go @@ -0,0 +1,113 @@ + +package rtmp + +import ( + "io" + "log" + "os" + "fmt" + "strings" +) + +type dummyWriter struct { +} + +func (m *dummyWriter) Write(p []byte) (n int, err error) { + return 0, nil +} + +type logger int + +func (l logger) Printf(format string, v ...interface{}) { + str := fmt.Sprintf(format, v...) + switch { + case strings.HasPrefix(str, "server") && l >= 0, + strings.HasPrefix(str, "stream") && l >= 0, + strings.HasPrefix(str, "event") && l >= 0, + strings.HasPrefix(str, "data") && l >= 0, + strings.HasPrefix(str, "msg") && l >= 1: + l2.Println(str) + } +} + +var ( + dummyW = &dummyWriter{} + l = logger(0) + l2 *log.Logger +) + +func init() { + l2 = log.New(os.Stderr, "", 0) + l2.SetFlags(log.Lmicroseconds) +} + +type stream struct { + r io.ReadWriteCloser +} + +func (s stream) Read(p []byte) (n int, err error) { + n, err = s.r.Read(p) + if err != nil { + panic(err) + } + return +} + +func (s stream) Write(p []byte) (n int, err error) { + n, err = s.r.Write(p) + if err != nil { + panic(err) + } + return +} + +func (s stream) Close() { + s.r.Close() +} + +func ReadBuf(r io.Reader, n int) (b []byte) { + b = make([]byte, n) + r.Read(b) + return +} + +func ReadInt(r io.Reader, n int) (ret int) { + b := ReadBuf(r, n) + for i := 0; i < n; i++ { + ret <<= 8 + ret += int(b[i]) + } + return +} + +func ReadIntLE(r io.Reader, n int) (ret int) { + b := ReadBuf(r, n) + for i := 0; i < n; i++ { + ret <<= 8 + ret += int(b[n-i-1]) + } + return +} + +func WriteBuf(w io.Writer, buf []byte) { + w.Write(buf) +} + +func WriteInt(w io.Writer, v int, n int) { + b := make([]byte, n) + for i := 0; i < n; i++ { + b[n-i-1] = byte(v&0xff) + v >>= 8 + } + WriteBuf(w, b) +} + +func WriteIntLE(w io.Writer, v int, n int) { + b := make([]byte, n) + for i := 0; i < n; i++ { + b[i] = byte(v&0xff) + v >>= 8 + } + WriteBuf(w, b) +} +