bugfix Describe(), use pktqueue, add Open(), add Streams(), add relocate

block start $ sign
This commit is contained in:
nareix 2016-06-06 16:45:29 +08:00
parent ab6a068d45
commit fed739021e

132
client.go
View File

@ -15,7 +15,9 @@ import (
"crypto/md5" "crypto/md5"
"github.com/nareix/av" "github.com/nareix/av"
"github.com/nareix/codec/h264parser" "github.com/nareix/codec/h264parser"
"github.com/nareix/codec/aacparser"
"github.com/nareix/rtsp/sdp" "github.com/nareix/rtsp/sdp"
"github.com/nareix/av/pktqueue"
) )
type Client struct { type Client struct {
@ -29,6 +31,7 @@ type Client struct {
session string session string
authorization string authorization string
body io.Reader body io.Reader
pktque *pktqueue.Queue
} }
type Request struct { type Request struct {
@ -72,6 +75,13 @@ func Connect(uri string) (self *Client, err error) {
return return
} }
func (self *Client) Streams() (streams []av.CodecData) {
for _, stream := range self.streams {
streams = append(streams, stream.CodecData)
}
return
}
func (self *Client) writeLine(line string) (err error) { func (self *Client) writeLine(line string) (err error) {
if self.DebugConn { if self.DebugConn {
fmt.Print("> ", line) fmt.Print("> ", line)
@ -124,6 +134,7 @@ func (self *Client) ReadResponse() (res Response, err error) {
if _, err = io.ReadFull(self.rconn, h[:]); err != nil { if _, err = io.ReadFull(self.rconn, h[:]); err != nil {
return return
} }
if h[0] == 36 { if h[0] == 36 {
// $ // $
res.BlockLength = int(h[2])<<8+int(h[3]) res.BlockLength = int(h[2])<<8+int(h[3])
@ -132,11 +143,35 @@ func (self *Client) ReadResponse() (res Response, err error) {
fmt.Println("block: len", res.BlockLength, "no", res.BlockNo) fmt.Println("block: len", res.BlockLength, "no", res.BlockNo)
} }
return return
} else if h[0] == 82 { } else if h[0] == 82 && h[1] == 84 && h[2] == 83 && h[3] == 80 {
// RTSP 200 OK // RTSP 200 OK
self.rconn = io.MultiReader(bytes.NewReader(h[:]), self.rconn) self.rconn = io.MultiReader(bytes.NewReader(h[:]), self.rconn)
} else { } else {
err = fmt.Errorf("invalid response bytes=%v", h) for {
for {
var b [1]byte
if _, err = self.rconn.Read(b[:]); err != nil {
return
}
if b[0] == 36 {
break
}
}
if self.DebugConn {
fmt.Println("block: relocate")
}
if _, err = io.ReadFull(self.rconn, h[1:4]); err != nil {
return
}
res.BlockLength = int(h[2])<<8+int(h[3])
res.BlockNo = int(h[1])
if res.BlockNo/2 < len(self.streams) {
break
}
}
if self.DebugConn {
fmt.Println("block: len", res.BlockLength, "no", res.BlockNo)
}
return return
} }
@ -236,10 +271,14 @@ func (self *Client) ReadResponse() (res Response, err error) {
func (self *Client) Setup(streams []int) (err error) { func (self *Client) Setup(streams []int) (err error) {
for _, si := range streams { for _, si := range streams {
req := Request{ uri := ""
Method: "SETUP", control := self.streams[si].Sdp.Control
Uri: self.requestUri+"/"+self.streams[si].Sdp.Control, if strings.HasPrefix(control, "rtsp://") {
uri = control
} else {
uri = self.requestUri+"/"+control
} }
req := Request{Method: "SETUP", Uri: uri}
req.Header = append(req.Header, fmt.Sprintf("Transport: RTP/AVP/TCP;unicast;interleaved=%d-%d", si*2, si*2+1)) req.Header = append(req.Header, fmt.Sprintf("Transport: RTP/AVP/TCP;unicast;interleaved=%d-%d", si*2, si*2+1))
if self.session != "" { if self.session != "" {
req.Header = append(req.Header, "Session: "+self.session) req.Header = append(req.Header, "Session: "+self.session)
@ -259,7 +298,7 @@ func md5hash(s string) string {
return hex.EncodeToString(h[:]) return hex.EncodeToString(h[:])
} }
func (self *Client) Describe() (streams []av.Stream, err error) { func (self *Client) Describe() (streams []av.CodecData, err error) {
var res Response var res Response
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
@ -294,8 +333,8 @@ func (self *Client) Describe() (streams []av.Stream, err error) {
self.streams = []*Stream{} self.streams = []*Stream{}
for _, info := range sdp.Decode(body) { for _, info := range sdp.Decode(body) {
stream := &Stream{Sdp: info} stream := &Stream{Sdp: info}
stream.SetType(info.Type)
if info.PayloadType >= 96 && info.PayloadType <= 127 {
switch info.Type { switch info.Type {
case av.H264: case av.H264:
var sps, pps []byte var sps, pps []byte
@ -310,8 +349,7 @@ func (self *Client) Describe() (streams []av.Stream, err error) {
} }
} }
if len(sps) > 0 && len(pps) > 0 { if len(sps) > 0 && len(pps) > 0 {
codecData, _ := h264parser.CreateCodecDataBySPSAndPPS(sps, pps) if stream.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil {
if err = stream.SetCodecData(codecData); err != nil {
err = fmt.Errorf("h264 sps/pps invalid: %s", err) err = fmt.Errorf("h264 sps/pps invalid: %s", err)
return return
} }
@ -325,11 +363,21 @@ func (self *Client) Describe() (streams []av.Stream, err error) {
err = fmt.Errorf("aac sdp config missing") err = fmt.Errorf("aac sdp config missing")
return return
} }
if err = stream.SetCodecData(info.Config); err != nil { if stream.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(info.Config); err != nil {
err = fmt.Errorf("aac sdp config invalid: %s", err) err = fmt.Errorf("aac sdp config invalid: %s", err)
return return
} }
} }
} else {
switch info.PayloadType {
case 0:
stream.CodecData = av.NewPCMMulawCodecData()
default:
err = fmt.Errorf("PayloadType=%d unsupported", info.PayloadType)
return
}
}
self.streams = append(self.streams, stream) self.streams = append(self.streams, stream)
} }
@ -337,6 +385,11 @@ func (self *Client) Describe() (streams []av.Stream, err error) {
for _, stream := range self.streams { for _, stream := range self.streams {
streams = append(streams, stream) streams = append(streams, stream)
} }
self.pktque = &pktqueue.Queue{
Poll: self.poll,
}
self.pktque.Alloc(streams)
return return
} }
@ -372,6 +425,7 @@ func (self *Stream) handleH264Payload(naluType byte, timestamp uint32, packet []
} }
self.gotpkt = true self.gotpkt = true
self.pkt.Data = packet self.pkt.Data = packet
self.timestamp = timestamp
} }
return return
@ -379,8 +433,6 @@ func (self *Stream) handleH264Payload(naluType byte, timestamp uint32, packet []
func (self *Stream) handlePacket(timestamp uint32, packet []byte) (err error) { func (self *Stream) handlePacket(timestamp uint32, packet []byte) (err error) {
switch self.Type() { switch self.Type() {
case av.AAC:
case av.H264: case av.H264:
/* /*
+---------------+ +---------------+
@ -480,6 +532,11 @@ func (self *Stream) handlePacket(timestamp uint32, packet []byte) (err error) {
err = fmt.Errorf("unsupported H264 naluType=%d", naluType) err = fmt.Errorf("unsupported H264 naluType=%d", naluType)
return return
} }
default:
self.gotpkt = true
self.pkt.Data = packet
self.timestamp = timestamp
} }
return return
} }
@ -490,6 +547,9 @@ func (self *Client) parseBlock(blockNo int, packet []byte) (streamIndex int, err
return return
} }
streamIndex = blockNo/2
stream := self.streams[streamIndex]
/* /*
0 1 2 3 0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
@ -562,9 +622,6 @@ func (self *Client) parseBlock(blockNo int, packet []byte) (streamIndex int, err
*/ */
//payloadType := packet[1]&0x7f //payloadType := packet[1]&0x7f
streamIndex = blockNo/2
stream := self.streams[streamIndex]
if self.DebugConn { if self.DebugConn {
//fmt.Println("packet:", stream.Type(), "offset", payloadOffset, "pt", payloadType) //fmt.Println("packet:", stream.Type(), "offset", payloadOffset, "pt", payloadType)
if len(packet)>24 { if len(packet)>24 {
@ -591,19 +648,22 @@ func (self *Client) Play() (err error) {
return return
} }
func (self *Client) ReadPacket() (streamIndex int, pkt av.Packet, err error) { func (self *Client) poll() (err error) {
for { for {
var res Response var res Response
if res, err = self.ReadResponse(); err != nil { if res, err = self.ReadResponse(); err != nil {
return return
} }
if res.BlockLength > 0 { if res.BlockLength > 0 {
if streamIndex, err = self.parseBlock(res.BlockNo, res.Block); err != nil { var i int
if i, err = self.parseBlock(res.BlockNo, res.Block); err != nil {
return return
} }
stream := self.streams[streamIndex] stream := self.streams[i]
if stream.gotpkt { if stream.gotpkt {
pkt = stream.pkt time := float64(stream.timestamp)/float64(stream.Sdp.TimeScale)
self.pktque.WriteTimePacket(i, time, stream.pkt)
stream.pkt = av.Packet{}
stream.gotpkt = false stream.gotpkt = false
return return
} }
@ -612,3 +672,37 @@ func (self *Client) ReadPacket() (streamIndex int, pkt av.Packet, err error) {
return return
} }
func (self *Client) ReadPacket() (i int, pkt av.Packet, err error) {
return self.pktque.ReadPacket()
}
func Open(uri string) (cli *Client, err error) {
_cli, err := Connect(uri)
if err != nil {
return
}
streams, err := _cli.Describe()
if err != nil {
return
}
setup := []int{}
for i := range streams {
setup = append(setup, i)
}
err = _cli.Setup(setup)
if err != nil {
return
}
err = _cli.Play()
if err != nil {
return
}
cli = _cli
return
}