Add chunked write, reduce initial chunk size for compatibility

This commit is contained in:
Ingo Oppermann 2024-02-28 21:15:50 +01:00
parent c3ec4804ba
commit 77ed436d0f

View File

@ -430,19 +430,21 @@ var CodecTypes = flv.CodecTypes
func (conn *Conn) writeBasicConf() (err error) {
// > WindowAckSize
if err = conn.writeWindowAckSize(1024 * 1024 * 2); err != nil {
if err = conn.writeWindowAckSize(2500000, false); err != nil {
return
}
// > SetPeerBandwidth
if err = conn.writeSetPeerBandwidth(1024*1024*2, 2); err != nil {
if err = conn.writeSetPeerBandwidth(2500000, 2, true); err != nil {
return
}
// > StreamBegin
if err = conn.writeStreamBegin(0); err != nil {
return
if conn.isserver {
// > StreamBegin
if err = conn.writeStreamBegin(0, true); err != nil {
return
}
}
// > SetChunkSize
if err = conn.writeSetChunkSize(1024 * 1024); err != nil {
if err = conn.writeSetChunkSize(1024*64, true); err != nil {
return
}
return
@ -488,22 +490,32 @@ func (conn *Conn) readConnect() (err error) {
return
}
objectEncoding, ok := connectparams["objectEncoding"]
if !ok {
objectEncoding = 3
}
// > _result("NetConnection.Connect.Success")
if err = conn.writeCommandMsg(3, 0, "_result", conn.commandtransid,
if err = conn.writeCommandMsg(false, 3, 0, "_result", conn.commandtransid,
flvio.AMFMap{
"fmtVer": "FMS/3,0,1,123",
"fmsVer": "FMS/3,0,1,123",
"capabilities": 31,
},
flvio.AMFMap{
"level": "status",
"code": "NetConnection.Connect.Success",
"description": "Connection succeeded.",
"objectEncoding": 3,
"objectEncoding": objectEncoding,
},
); err != nil {
return
}
// > onBWDone()
if err = conn.writeCommandMsg(true, 3, 0, "onBWDone", 0, nil, 8192); err != nil {
return
}
if err = conn.flushWrite(); err != nil {
return
}
@ -519,7 +531,7 @@ func (conn *Conn) readConnect() (err error) {
case "createStream":
conn.avmsgsid = uint32(1)
// > _result(streamid)
if err = conn.writeCommandMsg(3, 0, "_result", conn.commandtransid, nil, conn.avmsgsid); err != nil {
if err = conn.writeCommandMsg(false, 3, 0, "_result", conn.commandtransid, nil, conn.avmsgsid); err != nil {
return
}
if err = conn.flushWrite(); err != nil {
@ -544,7 +556,7 @@ func (conn *Conn) readConnect() (err error) {
}
// > onStatus()
if err = conn.writeCommandMsg(5, conn.avmsgsid,
if err = conn.writeCommandMsg(false, 5, conn.avmsgsid,
"onStatus", conn.commandtransid, nil,
flvio.AMFMap{
"level": "status",
@ -588,12 +600,12 @@ func (conn *Conn) readConnect() (err error) {
playpath, _ := conn.commandparams[0].(string)
// > streamBegin(streamid)
if err = conn.writeStreamBegin(conn.avmsgsid); err != nil {
if err = conn.writeStreamBegin(conn.avmsgsid, false); err != nil {
return
}
// > onStatus()
if err = conn.writeCommandMsg(5, conn.avmsgsid,
if err = conn.writeCommandMsg(false, 5, conn.avmsgsid,
"onStatus", conn.commandtransid, nil,
flvio.AMFMap{
"level": "status",
@ -700,7 +712,7 @@ func (conn *Conn) writeConnect(path string) (err error) {
if Debug {
fmt.Printf("rtmp: > connect('%s') host=%s\n", path, conn.URL.Host)
}
if err = conn.writeCommandMsg(3, 0, "connect", 1,
if err = conn.writeCommandMsg(false, 3, 0, "connect", 1,
flvio.AMFMap{
"app": path,
"flashVer": "MAC 22,0,0,192",
@ -766,7 +778,7 @@ func (conn *Conn) connectPublish() (err error) {
if Debug {
fmt.Printf("rtmp: > createStream()\n")
}
if err = conn.writeCommandMsg(3, 0, "createStream", transid, nil); err != nil {
if err = conn.writeCommandMsg(false, 3, 0, "createStream", transid, nil); err != nil {
return
}
transid++
@ -796,7 +808,7 @@ func (conn *Conn) connectPublish() (err error) {
if Debug {
fmt.Printf("rtmp: > publish('%s')\n", publishpath)
}
if err = conn.writeCommandMsg(8, conn.avmsgsid, "publish", transid, nil, publishpath); err != nil {
if err = conn.writeCommandMsg(false, 8, conn.avmsgsid, "publish", transid, nil, publishpath); err != nil {
return
}
transid++
@ -822,12 +834,12 @@ func (conn *Conn) connectPlay() (err error) {
if Debug {
fmt.Printf("rtmp: > createStream()\n")
}
if err = conn.writeCommandMsg(3, 0, "createStream", 2, nil); err != nil {
if err = conn.writeCommandMsg(false, 3, 0, "createStream", 2, nil); err != nil {
return
}
// > SetBufferLength 0,100ms
if err = conn.writeSetBufferLength(0, 100); err != nil {
if err = conn.writeSetBufferLength(0, 100, false); err != nil {
return
}
@ -856,7 +868,7 @@ func (conn *Conn) connectPlay() (err error) {
if Debug {
fmt.Printf("rtmp: > play('%s')\n", playpath)
}
if err = conn.writeCommandMsg(8, conn.avmsgsid, "play", 0, nil, playpath); err != nil {
if err = conn.writeCommandMsg(false, 8, conn.avmsgsid, "play", 0, nil, playpath); err != nil {
return
}
if err = conn.flushWrite(); err != nil {
@ -1030,37 +1042,49 @@ func (conn *Conn) tmpwbuf(n int) []byte {
return conn.writebuf
}
func (conn *Conn) writeSetChunkSize(size int) (err error) {
func (conn *Conn) writeSetChunkSize(size int, append bool) (err error) {
conn.writeMaxChunkSize = size
b := conn.tmpwbuf(chunkHeaderLength + 4)
n := conn.fillChunkHeader(b, 2, 0, msgtypeidSetChunkSize, 0, 4)
var b []byte
var n int
b = conn.tmpwbuf(chunkHeaderLength + 4)
n = conn.fillChunkHeader(append, b, 2, 0, msgtypeidSetChunkSize, 0, 4)
pio.PutU32BE(b[n:], uint32(size))
n += 4
_, err = conn.bufw.Write(b[:n])
return
}
func (conn *Conn) writeAck(seqnum uint32) (err error) {
b := conn.tmpwbuf(chunkHeaderLength + 4)
n := conn.fillChunkHeader(b, 2, 0, msgtypeidAck, 0, 4)
func (conn *Conn) writeAck(seqnum uint32, append bool) (err error) {
var b []byte
var n int
b = conn.tmpwbuf(chunkHeaderLength + 4)
n = conn.fillChunkHeader(append, b, 2, 0, msgtypeidAck, 0, 4)
pio.PutU32BE(b[n:], seqnum)
n += 4
_, err = conn.bufw.Write(b[:n])
return
}
func (conn *Conn) writeWindowAckSize(size uint32) (err error) {
b := conn.tmpwbuf(chunkHeaderLength + 4)
n := conn.fillChunkHeader(b, 2, 0, msgtypeidWindowAckSize, 0, 4)
func (conn *Conn) writeWindowAckSize(size uint32, append bool) (err error) {
var b []byte
var n int
b = conn.tmpwbuf(chunkHeaderLength + 4)
n = conn.fillChunkHeader(append, b, 2, 0, msgtypeidWindowAckSize, 0, 4)
pio.PutU32BE(b[n:], size)
n += 4
_, err = conn.bufw.Write(b[:n])
return
}
func (conn *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err error) {
b := conn.tmpwbuf(chunkHeaderLength + 5)
n := conn.fillChunkHeader(b, 2, 0, msgtypeidSetPeerBandwidth, 0, 5)
func (conn *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8, append bool) (err error) {
var b []byte
var n int
b = conn.tmpwbuf(chunkHeaderLength + 5)
n = conn.fillChunkHeader(append, b, 2, 0, msgtypeidSetPeerBandwidth, 0, 5)
pio.PutU32BE(b[n:], acksize)
n += 4
b[n] = limittype
@ -1069,22 +1093,22 @@ func (conn *Conn) writeSetPeerBandwidth(acksize uint32, limittype uint8) (err er
return
}
func (conn *Conn) writeCommandMsg(csid, msgsid uint32, args ...interface{}) (err error) {
return conn.writeAMF0Msg(msgtypeidCommandMsgAMF0, csid, msgsid, args...)
func (conn *Conn) writeCommandMsg(append bool, csid, msgsid uint32, args ...interface{}) (err error) {
return conn.writeAMF0Msg(append, msgtypeidCommandMsgAMF0, csid, msgsid, args...)
}
func (conn *Conn) writeDataMsg(csid, msgsid uint32, args ...interface{}) (err error) {
return conn.writeAMF0Msg(msgtypeidDataMsgAMF0, csid, msgsid, args...)
return conn.writeAMF0Msg(false, msgtypeidDataMsgAMF0, csid, msgsid, args...)
}
func (conn *Conn) writeAMF0Msg(msgtypeid uint8, csid, msgsid uint32, args ...interface{}) (err error) {
func (conn *Conn) writeAMF0Msg(append bool, msgtypeid uint8, csid, msgsid uint32, args ...interface{}) (err error) {
size := 0
for _, arg := range args {
size += flvio.LenAMF0Val(arg)
}
b := conn.tmpwbuf(chunkHeaderLength + size)
n := conn.fillChunkHeader(b, csid, 0, msgtypeid, msgsid, size)
n := conn.fillChunkHeader(append, b, csid, 0, msgtypeid, msgsid, size)
for _, arg := range args {
n += flvio.FillAMF0Val(b[n:], arg)
}
@ -1117,31 +1141,60 @@ func (conn *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) {
b := conn.tmpwbuf(actualChunkHeaderLength + flvio.MaxTagSubHeaderLength)
hdrlen := tag.FillHeader(b[actualChunkHeaderLength:])
conn.fillChunkHeader(b, csid, ts, msgtypeid, conn.avmsgsid, hdrlen+len(data))
n := hdrlen + actualChunkHeaderLength
if n+len(data) > conn.writeMaxChunkSize {
if err = conn.writeSetChunkSize(n + len(data)); err != nil {
return
}
}
conn.fillChunkHeader(false, b, csid, ts, msgtypeid, conn.avmsgsid, hdrlen+len(data))
n := actualChunkHeaderLength + hdrlen
if _, err = conn.bufw.Write(b[:n]); err != nil {
return
}
if _, err = conn.bufw.Write(data); err != nil {
chunksize := conn.writeMaxChunkSize
msgdataleft := len(data)
n = msgdataleft
if msgdataleft > chunksize-hdrlen {
n = chunksize - hdrlen
}
if _, err = conn.bufw.Write(data[:n]); err != nil {
return
}
data = data[n:]
msgdataleft -= n
for {
if msgdataleft == 0 {
break
}
n = conn.fillChunkHeader3(b, csid, ts)
if _, err = conn.bufw.Write(b[:n]); err != nil {
return
}
n = msgdataleft
if msgdataleft > chunksize {
n = chunksize
}
if _, err = conn.bufw.Write(data[:n]); err != nil {
return
}
data = data[n:]
msgdataleft -= n
}
err = conn.bufw.Flush()
return
}
func (conn *Conn) writeStreamBegin(msgsid uint32) (err error) {
func (conn *Conn) writeStreamBegin(msgsid uint32, append bool) (err error) {
b := conn.tmpwbuf(chunkHeaderLength + 6)
n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6)
n := conn.fillChunkHeader(append, b, 2, 0, msgtypeidUserControl, 0, 6)
pio.PutU16BE(b[n:], eventtypeStreamBegin)
n += 2
pio.PutU32BE(b[n:], msgsid)
@ -1150,9 +1203,9 @@ func (conn *Conn) writeStreamBegin(msgsid uint32) (err error) {
return
}
func (conn *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err error) {
func (conn *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32, append bool) (err error) {
b := conn.tmpwbuf(chunkHeaderLength + 10)
n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 10)
n := conn.fillChunkHeader(append, b, 2, 0, msgtypeidUserControl, 0, 10)
pio.PutU16BE(b[n:], eventtypeSetBufferLength)
n += 2
pio.PutU32BE(b[n:], msgsid)
@ -1163,9 +1216,9 @@ func (conn *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err err
return
}
func (conn *Conn) writePingResponse(timestamp uint32) (err error) {
func (conn *Conn) writePingResponse(timestamp uint32, append bool) (err error) {
b := conn.tmpwbuf(chunkHeaderLength + 10)
n := conn.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6)
n := conn.fillChunkHeader(append, b, 2, 0, msgtypeidUserControl, 0, 6)
pio.PutU16BE(b[n:], eventtypePingResponse)
n += 2
pio.PutU32BE(b[n:], timestamp)
@ -1177,41 +1230,74 @@ func (conn *Conn) writePingResponse(timestamp uint32) (err error) {
const chunkHeaderLength = 12
const FlvTimestampMax = 0xFFFFFF
func (conn *Conn) fillChunkHeader(b []byte, csid uint32, timestamp int32, msgtypeid uint8, msgsid uint32, msgdatalen int) (n int) {
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp |message length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message length (cont) |message type id| msg stream id |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message stream id (cont) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// Figure 9 Chunk Message Header Type 0
func (conn *Conn) fillChunkHeader(append bool, b []byte, csid uint32, timestamp int32, msgtypeid uint8, msgsid uint32, msgdatalen int) (n int) {
if !append {
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp |message length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message length (cont) |message type id| msg stream id |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message stream id (cont) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// Figure 9 Chunk Message Header Type 0
b[n] = byte(csid) & 0x3f
n++
if uint32(timestamp) <= FlvTimestampMax {
pio.PutU24BE(b[n:], uint32(timestamp))
} else {
pio.PutU24BE(b[n:], FlvTimestampMax)
}
n += 3
pio.PutU24BE(b[n:], uint32(msgdatalen))
n += 3
b[n] = msgtypeid
n++
pio.PutU32LE(b[n:], msgsid)
n += 4
if uint32(timestamp) > FlvTimestampMax {
pio.PutU32BE(b[n:], uint32(timestamp))
b[n] = byte(csid) & 0x3f
n++
if uint32(timestamp) <= FlvTimestampMax {
pio.PutU24BE(b[n:], uint32(timestamp))
} else {
pio.PutU24BE(b[n:], FlvTimestampMax)
}
n += 3
pio.PutU24BE(b[n:], uint32(msgdatalen))
n += 3
b[n] = msgtypeid
n++
pio.PutU32LE(b[n:], msgsid)
n += 4
if uint32(timestamp) > FlvTimestampMax {
pio.PutU32BE(b[n:], uint32(timestamp))
n += 4
}
} else {
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp delta |message length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message length (cont) |message type id|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// Figure 10 Chunk Message Header Type 1
b[n] = 1 << 6
b[n] += byte(csid) & 0x3f
n++
pio.PutU24BE(b[n:], 0)
n += 3
pio.PutU24BE(b[n:], uint32(msgdatalen))
n += 3
b[n] = msgtypeid
n++
}
if Debug {
fmt.Printf("rtmp: write chunk msgdatalen=%d msgsid=%d\n", msgdatalen, msgsid)
fmt.Print(hex.Dump(b[:msgdatalen]))
fmt.Print(hex.Dump(b[:n+msgdatalen]))
}
return
}
func (c *Conn) fillChunkHeader3(b []byte, csid uint32, timestamp int32) (n int) {
pio.PutU8(b, (uint8(csid)&0x3f)|3<<6)
n++
if timestamp >= FlvTimestampMax {
pio.PutU32BE(b, uint32(timestamp))
n += 4
}
return
@ -1436,7 +1522,7 @@ func (conn *Conn) readChunk() (err error) {
conn.ackn += uint32(n)
if conn.readAckSize != 0 && conn.ackn > conn.readAckSize {
if err = conn.writeAck(conn.ackn); err != nil {
if err = conn.writeAck(conn.ackn, false); err != nil {
return fmt.Errorf("writeACK: %w", err)
}
conn.flushWrite()
@ -1522,7 +1608,7 @@ func (conn *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms
return
}
pingtimestamp := pio.U32BE(msgdata[2:])
conn.writePingResponse(pingtimestamp)
conn.writePingResponse(pingtimestamp, false)
conn.flushWrite()
}
@ -1597,13 +1683,12 @@ func (conn *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms
return
}
conn.readMaxChunkSize = int(pio.U32BE(msgdata))
return
fmt.Printf("receiving new chunksize: %d\n", conn.readMaxChunkSize)
case msgtypeidWindowAckSize:
if len(conn.msgdata) != 4 {
return fmt.Errorf("invalid packet of WindowAckSize")
}
conn.readAckSize = pio.U32BE(conn.msgdata)
return
default:
if Debug {
fmt.Printf("rtmp: unhandled msg: %d\n", msgtypeid)