add timeout control

This commit is contained in:
nareix 2016-06-11 22:34:15 +08:00
parent 32e3debf2e
commit 3443f71d54
2 changed files with 57 additions and 5 deletions

View File

@ -29,15 +29,15 @@ type Client struct {
Headers []string Headers []string
RtspTimeout time.Duration RtspTimeout time.Duration
RtpFirstReadTimeout time.Duration RtpTimeout time.Duration
RtpReadTimeout time.Duration
RtpKeepAliveTimeout time.Duration RtpKeepAliveTimeout time.Duration
rtpKeepaliveTimer time.Time
setupCalled bool setupCalled bool
playCalled bool playCalled bool
url *url.URL url *url.URL
conn net.Conn conn *connWithTimeout
rconn io.Reader rconn io.Reader
requestUri string requestUri string
cseq uint cseq uint
@ -84,9 +84,11 @@ func DialTimeout(uri string, timeout time.Duration) (self *Client, err error) {
u2 := *URL u2 := *URL
u2.User = nil u2.User = nil
connt := &connWithTimeout{Conn: conn}
self = &Client{ self = &Client{
conn: conn, conn: connt,
rconn: conn, rconn: connt,
url: URL, url: URL,
requestUri: u2.String(), requestUri: u2.String(),
} }
@ -112,7 +114,21 @@ func (self *Client) writeLine(line string) (err error) {
return return
} }
func (self *Client) sendRtpKeepalive() (err error) {
if self.RtpKeepAliveTimeout > 0 {
if !self.rtpKeepaliveTimer.IsZero() && time.Now().Sub(self.rtpKeepaliveTimer) > self.RtpKeepAliveTimeout {
if err = self.Options(); err != nil {
return
}
}
self.rtpKeepaliveTimer = time.Now()
}
return
}
func (self *Client) WriteRequest(req Request) (err error) { func (self *Client) WriteRequest(req Request) (err error) {
self.conn.Timeout = self.RtspTimeout
self.cseq++ self.cseq++
req.Header = append(req.Header, self.Headers...) req.Header = append(req.Header, self.Headers...)
req.Header = append(req.Header, fmt.Sprintf("CSeq: %d", self.cseq)) req.Header = append(req.Header, fmt.Sprintf("CSeq: %d", self.cseq))
@ -142,6 +158,8 @@ func (self *Client) ReadResponse() (res Response, err error) {
self.rconn = io.MultiReader(bytes.NewReader(buf), self.rconn) self.rconn = io.MultiReader(bytes.NewReader(buf), self.rconn)
} }
if res.StatusCode == 200 { if res.StatusCode == 200 {
self.conn.Timeout = self.RtspTimeout
if res.ContentLength > 0 { if res.ContentLength > 0 {
res.Body = make([]byte, res.ContentLength) res.Body = make([]byte, res.ContentLength)
if _, err = io.ReadFull(self.rconn, res.Body); err != nil { if _, err = io.ReadFull(self.rconn, res.Body); err != nil {
@ -149,6 +167,10 @@ func (self *Client) ReadResponse() (res Response, err error) {
} }
} }
} else if res.BlockLength > 0 { } else if res.BlockLength > 0 {
if err = self.sendRtpKeepalive(); err != nil {
return
}
self.conn.Timeout = self.RtpTimeout
res.Block = make([]byte, res.BlockLength) res.Block = make([]byte, res.BlockLength)
if _, err = io.ReadFull(self.rconn, res.Block); err != nil { if _, err = io.ReadFull(self.rconn, res.Block); err != nil {
return return
@ -156,6 +178,8 @@ func (self *Client) ReadResponse() (res Response, err error) {
} }
}() }()
self.conn.Timeout = self.RtspTimeout
var h [4]byte var h [4]byte
if _, err = io.ReadFull(self.rconn, h[:]); err != nil { if _, err = io.ReadFull(self.rconn, h[:]); err != nil {
return return
@ -173,6 +197,8 @@ func (self *Client) ReadResponse() (res Response, err error) {
// RTSP 200 OK // RTSP 200 OK
self.rconn = io.MultiReader(bytes.NewReader(h[:]), self.rconn) self.rconn = io.MultiReader(bytes.NewReader(h[:]), self.rconn)
} else { } else {
self.conn.Timeout = self.RtpTimeout
for { for {
for { for {
var b [1]byte var b [1]byte

26
conn.go Normal file
View File

@ -0,0 +1,26 @@
package rtsp
import (
"net"
"time"
)
type connWithTimeout struct {
Timeout time.Duration
net.Conn
}
func (self connWithTimeout) Read(p []byte) (n int, err error) {
if self.Timeout > 0 {
self.Conn.SetReadDeadline(time.Now().Add(self.Timeout))
}
return self.Conn.Read(p)
}
func (self connWithTimeout) Write(p []byte) (n int, err error) {
if self.Timeout > 0 {
self.Conn.SetWriteDeadline(time.Now().Add(self.Timeout))
}
return self.Conn.Write(p)
}