diff --git a/handshake.go b/handshake.go index d6ae30c..b7645ef 100644 --- a/handshake.go +++ b/handshake.go @@ -97,25 +97,25 @@ func createResp(b []byte, key []byte) { func parseChal(b []byte, peerKey []byte, key []byte) (dig []byte, err int) { if b[0] != 0x3 { - l.Printf("handshake: invalid rtmp version\n") + l.Printf("handshake: invalid rtmp version") err = 1 return } epoch := b[1:5] ver := b[5:9] - l.Printf("handshake: epoch %v ver %v\n", epoch, ver) + l.Printf("handshake: epoch %v ver %v", epoch, ver) var offs int if offs = findDigest(b[1:], peerKey, 772); offs == -1 { if offs = findDigest(b[1:], peerKey, 8); offs == -1 { - l.Printf("handshake: digest not found\n") + l.Printf("handshake: digest not found") err = 1 return } } - l.Printf("handshake: offs = %v\n", offs) + l.Printf("handshake: offs = %v", offs) dig = makeDigest(key, b[1+offs:1+offs+32], -1) return @@ -124,22 +124,22 @@ func parseChal(b []byte, peerKey []byte, key []byte) (dig []byte, err int) { func handShake(rw io.ReadWriter) { b := ReadBuf(rw, 1537) - l.Printf("handshake: got client chal\n") + l.Printf("handshake: got client chal") dig, err := parseChal(b, clientKey2, serverKey) if err != 0 { return } createChal(b, serverVersion, serverKey2) - l.Printf("handshake: send server chal\n") + l.Printf("handshake: send server chal") rw.Write(b) b = make([]byte, 1536) createResp(b, dig) - l.Printf("handshake: send server resp\n") + l.Printf("handshake: send server resp") rw.Write(b) b = ReadBuf(rw, 1536) - l.Printf("handshake: got client resp\n") + l.Printf("handshake: got client resp") } diff --git a/msg.go b/msg.go index 5e8a457..011bdc6 100644 --- a/msg.go +++ b/msg.go @@ -111,6 +111,7 @@ type MsgStream struct { Msg map[int]*Msg vts, ats int + meta AMFObj id string role int stat int diff --git a/server.go b/server.go index f77c645..ad503d0 100644 --- a/server.go +++ b/server.go @@ -12,6 +12,7 @@ import ( "log" "time" "strings" + _ "runtime/debug" ) var ( @@ -36,7 +37,12 @@ func (e eventS) String() string { case E_PLAY: return "play" case E_DATA: - return fmt.Sprintf("data %d bytes ts %d", e.m.data.Len(), e.m.curts) + switch e.m.typeid { + case MSG_VIDEO: + return fmt.Sprintf("ts %d video %d bytes key %t", e.m.curts, e.m.data.Len(), e.m.key) + case MSG_AUDIO: + return fmt.Sprintf("ts %d audio %d bytes", e.m.curts, e.m.data.Len()) + } case E_CLOSE: return "close" } @@ -63,7 +69,6 @@ const ( E_CLOSE ) - func handleConnect(mr *MsgStream, trans float64, app string) { l.Printf("stream %v: connect: %s", mr, app) @@ -96,6 +101,7 @@ func handleConnect(mr *MsgStream, trans float64, app string) { func handleMeta(mr *MsgStream, obj AMFObj) { + mr.meta = obj mr.W = int(obj.obj["width"].f64) mr.H = int(obj.obj["height"].f64) @@ -195,71 +201,118 @@ func handlePlay(mr *MsgStream, strid int) { l.Printf("stream %v: test video %dx%d", mr, mr.W, mr.H) } - var b bytes.Buffer - WriteInt(&b, 0, 2) - WriteInt(&b, strid, 4) - mr.WriteMsg(0, 2, MSG_USER, 0, 0, b.Bytes()) // stream begin 1 + begin := func () { - mr.WriteAMFCmd(5, strid, []AMFObj { - AMFObj { atype : AMF_STRING, str : "onStatus", }, - AMFObj { atype : AMF_NUMBER, f64 : 0, }, - AMFObj { atype : AMF_NULL, }, - AMFObj { atype : AMF_OBJECT, - obj : map[string] AMFObj { - "level" : AMFObj { atype : AMF_STRING, str : "status", }, - "code" : AMFObj { atype : AMF_STRING, str : "NetStream.Play.Start", }, - "description" : AMFObj { atype : AMF_STRING, str : "Start live.", }, + var b bytes.Buffer + WriteInt(&b, 0, 2) + WriteInt(&b, strid, 4) + mr.WriteMsg(0, 2, MSG_USER, 0, 0, b.Bytes()) // stream begin 1 + + mr.WriteAMFCmd(5, strid, []AMFObj { + AMFObj { atype : AMF_STRING, str : "onStatus", }, + AMFObj { atype : AMF_NUMBER, f64 : 0, }, + AMFObj { atype : AMF_NULL, }, + AMFObj { atype : AMF_OBJECT, + obj : map[string] AMFObj { + "level" : AMFObj { atype : AMF_STRING, str : "status", }, + "code" : AMFObj { atype : AMF_STRING, str : "NetStream.Play.Start", }, + "description" : AMFObj { atype : AMF_STRING, str : "Start live.", }, + }, }, - }, - }) + }) - l.Printf("stream %v: video size %dx%d", mr, mr.W, mr.H) + l.Printf("stream %v: begin: video %dx%d", mr, mr.W, mr.H) - mr.WriteAMFMeta(5, strid, []AMFObj { - AMFObj { atype : AMF_STRING, str : "|RtmpSampleAccess", }, - AMFObj { atype : AMF_BOOLEAN, i: 1, }, - AMFObj { atype : AMF_BOOLEAN, i: 1, }, - }) + mr.WriteAMFMeta(5, strid, []AMFObj { + AMFObj { atype : AMF_STRING, str : "|RtmpSampleAccess", }, + AMFObj { atype : AMF_BOOLEAN, i: 1, }, + AMFObj { atype : AMF_BOOLEAN, i: 1, }, + }) - mr.WriteAMFMeta(5, strid, []AMFObj { - AMFObj { atype : AMF_STRING, str : "onMetaData", }, - AMFObj { atype : AMF_OBJECT, - obj : map[string] AMFObj { - "Server" : AMFObj { atype : AMF_STRING, str : "Golang Rtmp Server", }, - "width" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), }, - "height" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), }, - "displayWidth" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), }, - "displayHeight" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), }, - "duration" : AMFObj { atype : AMF_NUMBER, f64 : 0, }, - "framerate" : AMFObj { atype : AMF_NUMBER, f64 : 25000, }, - "videodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 731, }, - "videocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 7, }, - "audiodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 122, }, - "audiocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 10, }, + mr.meta.obj["Server"] = AMFObj { atype : AMF_STRING, str : "Golang Rtmp Server", } + mr.meta.atype = AMF_OBJECT + l.Printf("stream %v: %v", mr, mr.meta) + mr.WriteAMFMeta(5, strid, []AMFObj { + AMFObj { atype : AMF_STRING, str : "onMetaData", }, + mr.meta, + /* + AMFObj { atype : AMF_OBJECT, + obj : map[string] AMFObj { + "Server" : AMFObj { atype : AMF_STRING, str : "Golang Rtmp Server", }, + "width" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), }, + "height" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), }, + "displayWidth" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.W), }, + "displayHeight" : AMFObj { atype : AMF_NUMBER, f64 : float64(mr.H), }, + "duration" : AMFObj { atype : AMF_NUMBER, f64 : 0, }, + "framerate" : AMFObj { atype : AMF_NUMBER, f64 : 25000, }, + "videodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 731, }, + "videocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 7, }, + "audiodatarate" : AMFObj { atype : AMF_NUMBER, f64 : 122, }, + "audiocodecid" : AMFObj { atype : AMF_NUMBER, f64 : 10, }, + }, }, - }, - }) + */ + }) + } + + end := func () { + + l.Printf("stream %v: end", mr) + + var b bytes.Buffer + WriteInt(&b, 1, 2) + WriteInt(&b, strid, 4) + mr.WriteMsg(0, 2, MSG_USER, 0, 0, b.Bytes()) // stream eof 1 + + mr.WriteAMFCmd(5, strid, []AMFObj { + AMFObj { atype : AMF_STRING, str : "onStatus", }, + AMFObj { atype : AMF_NUMBER, f64 : 0, }, + AMFObj { atype : AMF_NULL, }, + AMFObj { atype : AMF_OBJECT, + obj : map[string] AMFObj { + "level" : AMFObj { atype : AMF_STRING, str : "status", }, + "code" : AMFObj { atype : AMF_STRING, str : "NetStream.Play.Stop", }, + "description" : AMFObj { atype : AMF_STRING, str : "Stop live.", }, + }, + }, + }) + } if tsrc == nil { - l.Printf("stream %v: extra size %d %d", mr, len(mr.extraA), len(mr.extraV)) - - mr.WriteAAC(strid, 0, mr.extraA[2:]) - mr.WritePPS(strid, 0, mr.extraV[5:]) - - l.Printf("stream %v: player wait data", mr) for { - m := <-mr.que - l.Printf("data %v: got %v", mr, m) - switch m.typeid { - case MSG_AUDIO: - mr.WriteAudio(strid, m.curts, m.data.Bytes()[2:]) - case MSG_VIDEO: - mr.WriteVideo(strid, m.curts, m.key, m.data.Bytes()[5:]) + nr := 0 + + for { + m := <-mr.que + if m == nil { + break + } + //if nr == 0 && !m.key { + // continue + //} + if nr == 0 { + begin() + l.Printf("stream %v: extra size %d %d", mr, len(mr.extraA), len(mr.extraV)) + mr.WriteAAC(strid, 0, mr.extraA[2:]) + mr.WritePPS(strid, 0, mr.extraV[5:]) + } + l.Printf("data %v: got %v curts %v", mr, m, m.curts) + switch m.typeid { + case MSG_AUDIO: + mr.WriteAudio(strid, m.curts, m.data.Bytes()[2:]) + case MSG_VIDEO: + mr.WriteVideo(strid, m.curts, m.key, m.data.Bytes()[5:]) + } + nr++ } + end() } + } else { + begin() + lf, _ := os.Create("/tmp/rtmp.log") ll := log.New(lf, "", 0) @@ -304,6 +357,9 @@ func serve(mr *MsgStream) { event <- eventS{id:E_CLOSE, mr:mr} <-eventDone l.Printf("stream %v: closed %v", mr, err) + //if err != "EOF" { + // l.Printf("stream %v: %v", mr, string(debug.Stack())) + //} } }() @@ -377,21 +433,27 @@ func listenEvent() { pubmap[e.mr.app] = e.mr } case e.id == E_PLAY: - src, ok := pubmap[e.mr.app] - if !ok || src.stat != WAIT_DATA { - l.Printf("event %v: cannot find publisher with app %s", e.mr, e.mr.app) - e.mr.Close() - } else { - e.mr.W = src.W - e.mr.H = src.H - e.mr.role = PLAYER - e.mr.extraA = src.extraA - e.mr.extraV = src.extraV - e.mr.que = make(chan *Msg, 16) + e.mr.role = PLAYER + e.mr.que = make(chan *Msg, 16) + for _, mr := range idmap { + if mr.role == PUBLISHER && mr.app == e.mr.app && mr.stat == WAIT_DATA { + e.mr.W = mr.W + e.mr.H = mr.H + e.mr.extraA = mr.extraA + e.mr.extraV = mr.extraV + e.mr.meta = mr.meta + } } case e.id == E_CLOSE: if e.mr.role == PUBLISHER { delete(pubmap, e.mr.app) + for _, mr := range idmap { + if mr.role == PLAYER && mr.app == e.mr.app { + ch := reflect.ValueOf(mr.que) + var m *Msg = nil + ch.TrySend(reflect.ValueOf(m)) + } + } } delete(idmap, e.mr.id) case e.id == E_DATA && e.mr.stat == WAIT_EXTRA: @@ -406,6 +468,15 @@ func listenEvent() { if len(e.mr.extraA) > 0 && len(e.mr.extraV) > 0 { l.Printf("event %v: got all extra", e.mr) e.mr.stat = WAIT_DATA + for _, mr := range idmap { + if mr.role == PLAYER && mr.app == e.mr.app { + mr.W = e.mr.W + mr.H = e.mr.H + mr.extraA = e.mr.extraA + mr.extraV = e.mr.extraV + mr.meta = e.mr.meta + } + } } case e.id == E_DATA && e.mr.stat == WAIT_DATA: for _, mr := range idmap { diff --git a/util.go b/util.go index 0787fca..d22e617 100644 --- a/util.go +++ b/util.go @@ -9,29 +9,25 @@ import ( "strings" ) -type dummyWriter struct { -} - -func (m *dummyWriter) Write(p []byte) (n int, err error) { - return 0, nil -} - type logger int func (l logger) Printf(format string, v ...interface{}) { str := fmt.Sprintf(format, v...) switch { - case strings.HasPrefix(str, "server") && l >= 0, - strings.HasPrefix(str, "stream") && l >= 0, - strings.HasPrefix(str, "event") && l >= 0, - strings.HasPrefix(str, "data") && l >= 0, - strings.HasPrefix(str, "msg") && l >= 1: + case strings.HasPrefix(str, "server") && l >= 1, + strings.HasPrefix(str, "stream") && l >= 1, + strings.HasPrefix(str, "event") && l >= 1, + strings.HasPrefix(str, "data") && l >= 1, + strings.HasPrefix(str, "msg") && l >= 2: l2.Println(str) + default: + if l >= 1 { + l2.Println(str) + } } } var ( - dummyW = &dummyWriter{} l = logger(0) l2 *log.Logger ) @@ -41,6 +37,10 @@ func init() { l2.SetFlags(log.Lmicroseconds) } +func LogLevel(i int) { + l = logger(i) +} + type stream struct { r io.ReadWriteCloser }