Fix missing chunk type 3 ext timestamp handling, add checks for valid sources, fix readAckSize

This commit is contained in:
Ingo Oppermann 2024-05-28 14:18:17 +02:00
parent c20880d59a
commit 41276a835f

View File

@ -303,6 +303,7 @@ func NewConn(netconn net.Conn) *Conn {
conn.readcsmap = make(map[uint32]*chunkStream) conn.readcsmap = make(map[uint32]*chunkStream)
conn.readMaxChunkSize = 128 conn.readMaxChunkSize = 128
conn.writeMaxChunkSize = 128 conn.writeMaxChunkSize = 128
conn.readAckSize = 1048576
conn.txrxcount = &txrxcount{ReadWriter: netconn} conn.txrxcount = &txrxcount{ReadWriter: netconn}
conn.bufr = bufio.NewReaderSize(conn.txrxcount, pio.RecommendBufioSize) conn.bufr = bufio.NewReaderSize(conn.txrxcount, pio.RecommendBufioSize)
conn.bufw = bufio.NewWriterSize(conn.txrxcount, pio.RecommendBufioSize) conn.bufw = bufio.NewWriterSize(conn.txrxcount, pio.RecommendBufioSize)
@ -319,6 +320,7 @@ type chunkStream struct {
gentimenow bool gentimenow bool
timedelta uint32 timedelta uint32
hastimeext bool hastimeext bool
timeext uint32
msgsid uint32 msgsid uint32
msgtypeid uint8 msgtypeid uint8
msgdatalen uint32 msgdatalen uint32
@ -791,7 +793,7 @@ func (conn *Conn) writeConnect(path string) (err error) {
} else { } else {
if conn.msgtypeid == msgtypeidWindowAckSize { if conn.msgtypeid == msgtypeidWindowAckSize {
if len(conn.msgdata) == 4 { if len(conn.msgdata) == 4 {
conn.readAckSize = pio.U32BE(conn.msgdata) conn.readAckSize = pio.U32BE(conn.msgdata) >> 1
} }
//if err = self.writeWindowAckSize(0xffffffff); err != nil { //if err = self.writeWindowAckSize(0xffffffff); err != nil {
// return // return
@ -1375,10 +1377,17 @@ func (conn *Conn) readChunk() (err error) {
csid = uint32(pio.U16BE(b)) + 64 csid = uint32(pio.U16BE(b)) + 64
} }
newcs := false
cs := conn.readcsmap[csid] cs := conn.readcsmap[csid]
if cs == nil { if cs == nil {
cs = &chunkStream{} cs = &chunkStream{}
conn.readcsmap[csid] = cs conn.readcsmap[csid] = cs
newcs = true
}
if len(conn.readcsmap) > 16 {
err = fmt.Errorf("too many chunk stream ids")
return
} }
var timestamp uint32 var timestamp uint32
@ -1419,6 +1428,7 @@ func (conn *Conn) readChunk() (err error) {
n += 4 n += 4
timestamp = pio.U32BE(b) timestamp = pio.U32BE(b)
cs.hastimeext = true cs.hastimeext = true
cs.timeext = timestamp
} else { } else {
cs.hastimeext = false cs.hastimeext = false
} }
@ -1435,6 +1445,10 @@ func (conn *Conn) readChunk() (err error) {
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// //
// Figure 10 Chunk Message Header Type 1 // Figure 10 Chunk Message Header Type 1
if newcs {
err = fmt.Errorf("chunk message type 1 without previous chunk")
return
}
if cs.msgdataleft != 0 { if cs.msgdataleft != 0 {
if !conn.skipInvalidMessages { if !conn.skipInvalidMessages {
err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft) err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft)
@ -1457,6 +1471,7 @@ func (conn *Conn) readChunk() (err error) {
n += 4 n += 4
timestamp = pio.U32BE(b) timestamp = pio.U32BE(b)
cs.hastimeext = true cs.hastimeext = true
cs.timeext = timestamp
} else { } else {
cs.hastimeext = false cs.hastimeext = false
} }
@ -1472,6 +1487,10 @@ func (conn *Conn) readChunk() (err error) {
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// //
// Figure 11 Chunk Message Header Type 2 // Figure 11 Chunk Message Header Type 2
if newcs {
err = fmt.Errorf("chunk message type 2 without previous chunk")
return
}
if cs.msgdataleft != 0 { if cs.msgdataleft != 0 {
if !conn.skipInvalidMessages { if !conn.skipInvalidMessages {
err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft) err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft)
@ -1492,6 +1511,7 @@ func (conn *Conn) readChunk() (err error) {
n += 4 n += 4
timestamp = pio.U32BE(b) timestamp = pio.U32BE(b)
cs.hastimeext = true cs.hastimeext = true
cs.timeext = timestamp
} else { } else {
cs.hastimeext = false cs.hastimeext = false
} }
@ -1500,6 +1520,11 @@ func (conn *Conn) readChunk() (err error) {
cs.Start() cs.Start()
case 3: case 3:
if newcs {
err = fmt.Errorf("chunk message type 3 without previous chunk")
return
}
if cs.msgdataleft == 0 { if cs.msgdataleft == 0 {
switch cs.msghdrtype { switch cs.msghdrtype {
case 0: case 0:
@ -1510,6 +1535,7 @@ func (conn *Conn) readChunk() (err error) {
n += 4 n += 4
timestamp = pio.U32BE(b) timestamp = pio.U32BE(b)
cs.timenow = timestamp cs.timenow = timestamp
cs.timeext = timestamp
} }
case 1, 2: case 1, 2:
if cs.hastimeext { if cs.hastimeext {
@ -1524,6 +1550,18 @@ func (conn *Conn) readChunk() (err error) {
cs.timenow += timestamp cs.timenow += timestamp
} }
cs.Start() cs.Start()
} else {
if cs.hastimeext {
var b []byte
if b, err = conn.bufr.Peek(4); err != nil {
return
}
if pio.U32BE(b) == cs.timeext {
if _, err = io.ReadFull(conn.bufr, b[:4]); err != nil {
return
}
}
}
} }
default: default:
@ -1545,12 +1583,12 @@ func (conn *Conn) readChunk() (err error) {
if Debug { if Debug {
fmt.Printf("rtmp: chunk msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d max=%d", fmt.Printf("rtmp: chunk msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d max=%d",
cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) cs.msgsid, cs.msgtypeid, msghdrtype, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize)
} }
if conn.debugChunks { if conn.debugChunks {
data := fmt.Sprintf("rtmp: chunk id=%d msgsid=%d msgtypeid=%d msghdrtype=%d timestamp=%d len=%d left=%d max=%d", data := fmt.Sprintf("rtmp: chunk id=%d msgsid=%d msgtypeid=%d msghdrtype=%d timestamp=%d ext=%v len=%d left=%d max=%d",
csid, cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.timenow, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) csid, cs.msgsid, cs.msgtypeid, msghdrtype, cs.timenow, cs.hastimeext, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize)
if cs.msgtypeid != msgtypeidVideoMsg && cs.msgtypeid != msgtypeidAudioMsg { if cs.msgtypeid != msgtypeidVideoMsg && cs.msgtypeid != msgtypeidAudioMsg {
if len(cs.msgdata) > 1024 { if len(cs.msgdata) > 1024 {
@ -1603,6 +1641,8 @@ func (conn *Conn) readChunk() (err error) {
if err = conn.handleMsg(timestamp, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil { if err = conn.handleMsg(timestamp, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil {
return fmt.Errorf("handleMsg: %w", err) return fmt.Errorf("handleMsg: %w", err)
} }
cs.msgdata = nil
} }
conn.ackn += uint32(n) conn.ackn += uint32(n)
@ -1773,7 +1813,7 @@ func (conn *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms
if len(conn.msgdata) != 4 { if len(conn.msgdata) != 4 {
return fmt.Errorf("invalid packet of WindowAckSize") return fmt.Errorf("invalid packet of WindowAckSize")
} }
conn.readAckSize = pio.U32BE(conn.msgdata) conn.readAckSize = pio.U32BE(conn.msgdata) >> 1
default: default:
if Debug { if Debug {
fmt.Printf("rtmp: unhandled msg: %d\n", msgtypeid) fmt.Printf("rtmp: unhandled msg: %d\n", msgtypeid)