diff --git a/format/rtmp/rtmp.go b/format/rtmp/rtmp.go index 5370c30..beb4fed 100644 --- a/format/rtmp/rtmp.go +++ b/format/rtmp/rtmp.go @@ -430,19 +430,21 @@ var CodecTypes = flv.CodecTypes func (conn *Conn) writeBasicConf() (err error) { // > WindowAckSize - if err = conn.writeWindowAckSize(1024 * 1024 * 2); err != nil { + if err = conn.writeWindowAckSize(2500000, false); err != nil { return } // > SetPeerBandwidth - if err = conn.writeSetPeerBandwidth(1024*1024*2, 2); err != nil { + if err = conn.writeSetPeerBandwidth(2500000, 2, true); err != nil { return } - // > StreamBegin - if err = conn.writeStreamBegin(0); err != nil { - return + if conn.isserver { + // > StreamBegin + if err = conn.writeStreamBegin(0, true); err != nil { + return + } } // > SetChunkSize - if err = conn.writeSetChunkSize(1024 * 1024); err != nil { + if err = conn.writeSetChunkSize(1024*64, true); err != nil { return } return @@ -488,22 +490,32 @@ func (conn *Conn) readConnect() (err error) { return } + objectEncoding, ok := connectparams["objectEncoding"] + if !ok { + objectEncoding = 3 + } + // > _result("NetConnection.Connect.Success") - if err = conn.writeCommandMsg(3, 0, "_result", conn.commandtransid, + if err = conn.writeCommandMsg(false, 3, 0, "_result", conn.commandtransid, flvio.AMFMap{ - "fmtVer": "FMS/3,0,1,123", + "fmsVer": "FMS/3,0,1,123", "capabilities": 31, }, flvio.AMFMap{ "level": "status", "code": "NetConnection.Connect.Success", "description": "Connection succeeded.", - "objectEncoding": 3, + "objectEncoding": objectEncoding, }, ); err != nil { return } + // > onBWDone() + if err = conn.writeCommandMsg(true, 3, 0, "onBWDone", 0, nil, 8192); err != nil { + return + } + if err = conn.flushWrite(); err != nil { return } @@ -519,7 +531,7 @@ func (conn *Conn) readConnect() (err error) { case "createStream": conn.avmsgsid = uint32(1) // > _result(streamid) - if err = conn.writeCommandMsg(3, 0, "_result", conn.commandtransid, nil, conn.avmsgsid); err != nil { + if err = conn.writeCommandMsg(false, 3, 0, "_result", conn.commandtransid, nil, conn.avmsgsid); err != nil { return } if err = conn.flushWrite(); err != nil { @@ -544,7 +556,7 @@ func (conn *Conn) readConnect() (err error) { } // > onStatus() - if err = conn.writeCommandMsg(5, conn.avmsgsid, + if err = conn.writeCommandMsg(false, 5, conn.avmsgsid, "onStatus", conn.commandtransid, nil, flvio.AMFMap{ "level": "status", @@ -588,12 +600,12 @@ func (conn *Conn) readConnect() (err error) { playpath, _ := conn.commandparams[0].(string) // > streamBegin(streamid) - if err = conn.writeStreamBegin(conn.avmsgsid); err != nil { + if err = conn.writeStreamBegin(conn.avmsgsid, false); err != nil { return } // > onStatus() - if err = conn.writeCommandMsg(5, conn.avmsgsid, + if err = conn.writeCommandMsg(false, 5, conn.avmsgsid, "onStatus", conn.commandtransid, nil, flvio.AMFMap{ "level": "status", @@ -700,7 +712,7 @@ func (conn *Conn) writeConnect(path string) (err error) { if Debug { fmt.Printf("rtmp: > connect('%s') host=%s\n", path, conn.URL.Host) } - if err = conn.writeCommandMsg(3, 0, "connect", 1, + if err = conn.writeCommandMsg(false, 3, 0, "connect", 1, flvio.AMFMap{ "app": path, "flashVer": "MAC 22,0,0,192", @@ -766,7 +778,7 @@ func (conn *Conn) connectPublish() (err error) { if Debug { fmt.Printf("rtmp: > createStream()\n") } - if err = conn.writeCommandMsg(3, 0, "createStream", transid, nil); err != nil { + if err = conn.writeCommandMsg(false, 3, 0, "createStream", transid, nil); err != nil { return } transid++ @@ -796,7 +808,7 @@ func (conn *Conn) connectPublish() (err error) { if Debug { fmt.Printf("rtmp: > publish('%s')\n", publishpath) } - if err = conn.writeCommandMsg(8, conn.avmsgsid, "publish", transid, nil, publishpath); err != nil { + if err = conn.writeCommandMsg(false, 8, conn.avmsgsid, "publish", transid, nil, publishpath); err != nil { return } transid++ @@ -822,12 +834,12 @@ func (conn *Conn) connectPlay() (err error) { if Debug { fmt.Printf("rtmp: > createStream()\n") } - if err = conn.writeCommandMsg(3, 0, "createStream", 2, nil); err != nil { + if err = conn.writeCommandMsg(false, 3, 0, "createStream", 2, nil); err != nil { return } // > SetBufferLength 0,100ms - if err = conn.writeSetBufferLength(0, 100); err != nil { + if err = conn.writeSetBufferLength(0, 100, false); err != nil { return } @@ -856,7 +868,7 @@ func (conn *Conn) connectPlay() (err error) { if Debug { fmt.Printf("rtmp: > play('%s')\n", playpath) } - if err = conn.writeCommandMsg(8, conn.avmsgsid, "play", 0, nil, playpath); err != nil { + if err = conn.writeCommandMsg(false, 8, conn.avmsgsid, "play", 0, nil, playpath); err != nil { return } if err = conn.flushWrite(); err != nil { @@ -1030,37 +1042,49 @@ func (conn *Conn) tmpwbuf(n int) []byte { return conn.writebuf } -func (conn *Conn) writeSetChunkSize(size int) (err error) { +func (conn *Conn) writeSetChunkSize(size int, append bool) (err error) { conn.writeMaxChunkSize = size - b := conn.tmpwbuf(chunkHeaderLength + 4) - n := conn.fillChunkHeader(b, 2, 0, msgtypeidSetChunkSize, 0, 4) + var b []byte + var n int + + b = conn.tmpwbuf(chunkHeaderLength + 4) + n = conn.fillChunkHeader(append, b, 2, 0, msgtypeidSetChunkSize, 0, 4) pio.PutU32BE(b[n:], uint32(size)) n += 4 _, err = conn.bufw.Write(b[:n]) return } -func (conn *Conn) writeAck(seqnum uint32) (err error) { - b := conn.tmpwbuf(chunkHeaderLength + 4) - n := conn.fillChunkHeader(b, 2, 0, msgtypeidAck, 0, 4) +func (conn *Conn) writeAck(seqnum uint32, append bool) (err error) { + var b []byte + var n int + + b = conn.tmpwbuf(chunkHeaderLength + 4) + n = conn.fillChunkHeader(append, b, 2, 0, msgtypeidAck, 0, 4) pio.PutU32BE(b[n:], seqnum) n += 4 _, err = conn.bufw.Write(b[:n]) return } -func (conn *Conn) writeWindowAckSize(size uint32) (err error) { - b := conn.tmpwbuf(chunkHeaderLength + 4) - n := conn.fillChunkHeader(b, 2, 0, msgtypeidWindowAckSize, 0, 4) +func (conn *Conn) writeWindowAckSize(size uint32, append bool) (err error) { + var b []byte + var n int + + b = conn.tmpwbuf(chunkHeaderLength + 4) + n = conn.fillChunkHeader(append, b, 2, 0, msgtypeidWindowAckSize, 0, 4) pio.PutU32BE(b[n:], size) n += 4 _, err = conn.bufw.Write(b[:n]) return } -func (conn *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err error) { - b := conn.tmpwbuf(chunkHeaderLength + 5) - n := conn.fillChunkHeader(b, 2, 0, msgtypeidSetPeerBandwidth, 0, 5) +func (conn *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8, append bool) (err error) { + var b []byte + var n int + + b = conn.tmpwbuf(chunkHeaderLength + 5) + n = conn.fillChunkHeader(append, b, 2, 0, msgtypeidSetPeerBandwidth, 0, 5) pio.PutU32BE(b[n:], acksize) n += 4 b[n] = limittype @@ -1069,22 +1093,22 @@ func (conn *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err er return } -func (conn *Conn) writeCommandMsg(csid, msgsid uint32, args ...interface{}) (err error) { - return conn.writeAMF0Msg(msgtypeidCommandMsgAMF0, csid, msgsid, args...) +func (conn *Conn) writeCommandMsg(append bool, csid, msgsid uint32, args ...interface{}) (err error) { + return conn.writeAMF0Msg(append, msgtypeidCommandMsgAMF0, csid, msgsid, args...) } func (conn *Conn) writeDataMsg(csid, msgsid uint32, args ...interface{}) (err error) { - return conn.writeAMF0Msg(msgtypeidDataMsgAMF0, csid, msgsid, args...) + return conn.writeAMF0Msg(false, msgtypeidDataMsgAMF0, csid, msgsid, args...) } -func (conn *Conn) writeAMF0Msg(msgtypeid uint8, csid, msgsid uint32, args ...interface{}) (err error) { +func (conn *Conn) writeAMF0Msg(append bool, msgtypeid uint8, csid, msgsid uint32, args ...interface{}) (err error) { size := 0 for _, arg := range args { size += flvio.LenAMF0Val(arg) } b := conn.tmpwbuf(chunkHeaderLength + size) - n := conn.fillChunkHeader(b, csid, 0, msgtypeid, msgsid, size) + n := conn.fillChunkHeader(append, b, csid, 0, msgtypeid, msgsid, size) for _, arg := range args { n += flvio.FillAMF0Val(b[n:], arg) } @@ -1117,31 +1141,60 @@ func (conn *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) { b := conn.tmpwbuf(actualChunkHeaderLength + flvio.MaxTagSubHeaderLength) hdrlen := tag.FillHeader(b[actualChunkHeaderLength:]) - conn.fillChunkHeader(b, csid, ts, msgtypeid, conn.avmsgsid, hdrlen+len(data)) - n := hdrlen + actualChunkHeaderLength - - if n+len(data) > conn.writeMaxChunkSize { - if err = conn.writeSetChunkSize(n + len(data)); err != nil { - return - } - } + conn.fillChunkHeader(false, b, csid, ts, msgtypeid, conn.avmsgsid, hdrlen+len(data)) + n := actualChunkHeaderLength + hdrlen if _, err = conn.bufw.Write(b[:n]); err != nil { return } - if _, err = conn.bufw.Write(data); err != nil { + chunksize := conn.writeMaxChunkSize + msgdataleft := len(data) + + n = msgdataleft + if msgdataleft > chunksize-hdrlen { + n = chunksize - hdrlen + } + + if _, err = conn.bufw.Write(data[:n]); err != nil { return } + data = data[n:] + msgdataleft -= n + + for { + if msgdataleft == 0 { + break + } + + n = conn.fillChunkHeader3(b, csid, ts) + + if _, err = conn.bufw.Write(b[:n]); err != nil { + return + } + + n = msgdataleft + if msgdataleft > chunksize { + n = chunksize + } + + if _, err = conn.bufw.Write(data[:n]); err != nil { + return + } + + data = data[n:] + msgdataleft -= n + } + err = conn.bufw.Flush() return } -func (conn *Conn) writeStreamBegin(msgsid uint32) (err error) { +func (conn *Conn) writeStreamBegin(msgsid uint32, append bool) (err error) { b := conn.tmpwbuf(chunkHeaderLength + 6) - n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6) + n := conn.fillChunkHeader(append, b, 2, 0, msgtypeidUserControl, 0, 6) pio.PutU16BE(b[n:], eventtypeStreamBegin) n += 2 pio.PutU32BE(b[n:], msgsid) @@ -1150,9 +1203,9 @@ func (conn *Conn) writeStreamBegin(msgsid uint32) (err error) { return } -func (conn *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err error) { +func (conn *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32, append bool) (err error) { b := conn.tmpwbuf(chunkHeaderLength + 10) - n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 10) + n := conn.fillChunkHeader(append, b, 2, 0, msgtypeidUserControl, 0, 10) pio.PutU16BE(b[n:], eventtypeSetBufferLength) n += 2 pio.PutU32BE(b[n:], msgsid) @@ -1163,9 +1216,9 @@ func (conn *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err err return } -func (conn *Conn) writePingResponse(timestamp uint32) (err error) { +func (conn *Conn) writePingResponse(timestamp uint32, append bool) (err error) { b := conn.tmpwbuf(chunkHeaderLength + 10) - n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6) + n := conn.fillChunkHeader(append, b, 2, 0, msgtypeidUserControl, 0, 6) pio.PutU16BE(b[n:], eventtypePingResponse) n += 2 pio.PutU32BE(b[n:], timestamp) @@ -1177,41 +1230,74 @@ func (conn *Conn) writePingResponse(timestamp uint32) (err error) { const chunkHeaderLength = 12 const FlvTimestampMax = 0xFFFFFF -func (conn *Conn) fillChunkHeader(b []byte, csid uint32, timestamp int32, msgtypeid uint8, msgsid uint32, msgdatalen int) (n int) { - // 0 1 2 3 - // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // | timestamp |message length | - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // | message length (cont) |message type id| msg stream id | - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // | message stream id (cont) | - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // - // Figure 9 Chunk Message Header – Type 0 +func (conn *Conn) fillChunkHeader(append bool, b []byte, csid uint32, timestamp int32, msgtypeid uint8, msgsid uint32, msgdatalen int) (n int) { + if !append { + // 0 1 2 3 + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | timestamp |message length | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | message length (cont) |message type id| msg stream id | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | message stream id (cont) | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // + // Figure 9 Chunk Message Header – Type 0 - b[n] = byte(csid) & 0x3f - n++ - if uint32(timestamp) <= FlvTimestampMax { - pio.PutU24BE(b[n:], uint32(timestamp)) - } else { - pio.PutU24BE(b[n:], FlvTimestampMax) - } - n += 3 - pio.PutU24BE(b[n:], uint32(msgdatalen)) - n += 3 - b[n] = msgtypeid - n++ - pio.PutU32LE(b[n:], msgsid) - n += 4 - if uint32(timestamp) > FlvTimestampMax { - pio.PutU32BE(b[n:], uint32(timestamp)) + b[n] = byte(csid) & 0x3f + n++ + if uint32(timestamp) <= FlvTimestampMax { + pio.PutU24BE(b[n:], uint32(timestamp)) + } else { + pio.PutU24BE(b[n:], FlvTimestampMax) + } + n += 3 + pio.PutU24BE(b[n:], uint32(msgdatalen)) + n += 3 + b[n] = msgtypeid + n++ + pio.PutU32LE(b[n:], msgsid) n += 4 + if uint32(timestamp) > FlvTimestampMax { + pio.PutU32BE(b[n:], uint32(timestamp)) + n += 4 + } + } else { + // 0 1 2 3 + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | timestamp delta |message length | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | message length (cont) |message type id| + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // + // Figure 10 Chunk Message Header – Type 1 + + b[n] = 1 << 6 + b[n] += byte(csid) & 0x3f + n++ + pio.PutU24BE(b[n:], 0) + n += 3 + pio.PutU24BE(b[n:], uint32(msgdatalen)) + n += 3 + b[n] = msgtypeid + n++ } if Debug { fmt.Printf("rtmp: write chunk msgdatalen=%d msgsid=%d\n", msgdatalen, msgsid) - fmt.Print(hex.Dump(b[:msgdatalen])) + fmt.Print(hex.Dump(b[:n+msgdatalen])) + } + + return +} + +func (c *Conn) fillChunkHeader3(b []byte, csid uint32, timestamp int32) (n int) { + pio.PutU8(b, (uint8(csid)&0x3f)|3<<6) + n++ + if timestamp >= FlvTimestampMax { + pio.PutU32BE(b, uint32(timestamp)) + n += 4 } return @@ -1436,7 +1522,7 @@ func (conn *Conn) readChunk() (err error) { conn.ackn += uint32(n) if conn.readAckSize != 0 && conn.ackn > conn.readAckSize { - if err = conn.writeAck(conn.ackn); err != nil { + if err = conn.writeAck(conn.ackn, false); err != nil { return fmt.Errorf("writeACK: %w", err) } conn.flushWrite() @@ -1522,7 +1608,7 @@ func (conn *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms return } pingtimestamp := pio.U32BE(msgdata[2:]) - conn.writePingResponse(pingtimestamp) + conn.writePingResponse(pingtimestamp, false) conn.flushWrite() } @@ -1597,13 +1683,12 @@ func (conn *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms return } conn.readMaxChunkSize = int(pio.U32BE(msgdata)) - return + fmt.Printf("receiving new chunksize: %d\n", conn.readMaxChunkSize) case msgtypeidWindowAckSize: if len(conn.msgdata) != 4 { return fmt.Errorf("invalid packet of WindowAckSize") } conn.readAckSize = pio.U32BE(conn.msgdata) - return default: if Debug { fmt.Printf("rtmp: unhandled msg: %d\n", msgtypeid)