diff --git a/format/rtmp/rtmp.go b/format/rtmp/rtmp.go index 2dd5165..3450aaf 100644 --- a/format/rtmp/rtmp.go +++ b/format/rtmp/rtmp.go @@ -303,6 +303,7 @@ func NewConn(netconn net.Conn) *Conn { conn.readcsmap = make(map[uint32]*chunkStream) conn.readMaxChunkSize = 128 conn.writeMaxChunkSize = 128 + conn.readAckSize = 1048576 conn.txrxcount = &txrxcount{ReadWriter: netconn} conn.bufr = bufio.NewReaderSize(conn.txrxcount, pio.RecommendBufioSize) conn.bufw = bufio.NewWriterSize(conn.txrxcount, pio.RecommendBufioSize) @@ -319,6 +320,7 @@ type chunkStream struct { gentimenow bool timedelta uint32 hastimeext bool + timeext uint32 msgsid uint32 msgtypeid uint8 msgdatalen uint32 @@ -791,7 +793,7 @@ func (conn *Conn) writeConnect(path string) (err error) { } else { if conn.msgtypeid == msgtypeidWindowAckSize { if len(conn.msgdata) == 4 { - conn.readAckSize = pio.U32BE(conn.msgdata) + conn.readAckSize = pio.U32BE(conn.msgdata) >> 1 } //if err = self.writeWindowAckSize(0xffffffff); err != nil { // return @@ -1375,10 +1377,17 @@ func (conn *Conn) readChunk() (err error) { csid = uint32(pio.U16BE(b)) + 64 } + newcs := false cs := conn.readcsmap[csid] if cs == nil { cs = &chunkStream{} conn.readcsmap[csid] = cs + newcs = true + } + + if len(conn.readcsmap) > 16 { + err = fmt.Errorf("too many chunk stream ids") + return } var timestamp uint32 @@ -1419,6 +1428,7 @@ func (conn *Conn) readChunk() (err error) { n += 4 timestamp = pio.U32BE(b) cs.hastimeext = true + cs.timeext = timestamp } else { cs.hastimeext = false } @@ -1435,6 +1445,10 @@ func (conn *Conn) readChunk() (err error) { // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // // Figure 10 Chunk Message Header – Type 1 + if newcs { + err = fmt.Errorf("chunk message type 1 without previous chunk") + return + } if cs.msgdataleft != 0 { if !conn.skipInvalidMessages { err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft) @@ -1457,6 +1471,7 @@ func (conn *Conn) readChunk() (err error) { n += 4 timestamp = pio.U32BE(b) cs.hastimeext = true + cs.timeext = timestamp } else { cs.hastimeext = false } @@ -1472,6 +1487,10 @@ func (conn *Conn) readChunk() (err error) { // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // // Figure 11 Chunk Message Header – Type 2 + if newcs { + err = fmt.Errorf("chunk message type 2 without previous chunk") + return + } if cs.msgdataleft != 0 { if !conn.skipInvalidMessages { err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft) @@ -1492,6 +1511,7 @@ func (conn *Conn) readChunk() (err error) { n += 4 timestamp = pio.U32BE(b) cs.hastimeext = true + cs.timeext = timestamp } else { cs.hastimeext = false } @@ -1500,6 +1520,11 @@ func (conn *Conn) readChunk() (err error) { cs.Start() case 3: + if newcs { + err = fmt.Errorf("chunk message type 3 without previous chunk") + return + } + if cs.msgdataleft == 0 { switch cs.msghdrtype { case 0: @@ -1510,6 +1535,7 @@ func (conn *Conn) readChunk() (err error) { n += 4 timestamp = pio.U32BE(b) cs.timenow = timestamp + cs.timeext = timestamp } case 1, 2: if cs.hastimeext { @@ -1524,6 +1550,18 @@ func (conn *Conn) readChunk() (err error) { cs.timenow += timestamp } cs.Start() + } else { + if cs.hastimeext { + var b []byte + if b, err = conn.bufr.Peek(4); err != nil { + return + } + if pio.U32BE(b) == cs.timeext { + if _, err = io.ReadFull(conn.bufr, b[:4]); err != nil { + return + } + } + } } default: @@ -1545,12 +1583,12 @@ func (conn *Conn) readChunk() (err error) { if Debug { fmt.Printf("rtmp: chunk msgsid=%d msgtypeid=%d msghdrtype=%d len=%d left=%d max=%d", - cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) + cs.msgsid, cs.msgtypeid, msghdrtype, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) } if conn.debugChunks { - data := fmt.Sprintf("rtmp: chunk id=%d msgsid=%d msgtypeid=%d msghdrtype=%d timestamp=%d len=%d left=%d max=%d", - csid, cs.msgsid, cs.msgtypeid, cs.msghdrtype, cs.timenow, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) + data := fmt.Sprintf("rtmp: chunk id=%d msgsid=%d msgtypeid=%d msghdrtype=%d timestamp=%d ext=%v len=%d left=%d max=%d", + csid, cs.msgsid, cs.msgtypeid, msghdrtype, cs.timenow, cs.hastimeext, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) if cs.msgtypeid != msgtypeidVideoMsg && cs.msgtypeid != msgtypeidAudioMsg { if len(cs.msgdata) > 1024 { @@ -1603,6 +1641,8 @@ func (conn *Conn) readChunk() (err error) { if err = conn.handleMsg(timestamp, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil { return fmt.Errorf("handleMsg: %w", err) } + + cs.msgdata = nil } conn.ackn += uint32(n) @@ -1773,7 +1813,7 @@ func (conn *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms if len(conn.msgdata) != 4 { return fmt.Errorf("invalid packet of WindowAckSize") } - conn.readAckSize = pio.U32BE(conn.msgdata) + conn.readAckSize = pio.U32BE(conn.msgdata) >> 1 default: if Debug { fmt.Printf("rtmp: unhandled msg: %d\n", msgtypeid)