diff --git a/README.md b/README.md index 56dc33c..f5f9a17 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,97 @@ -go-av + +joy4: A modern multimedia library for Golang == -Golang audio and video manipulation library +* Well-designed API, easy to use, write a media server in 1 minute: - * h264/aac encoder and decoder (using libav) ([HERE](http://github.com/go-av/codec)) - * mp4 reader and writer ([HERE](http://github.com/go-av/mp4)) - * rtmp server ([HERE](http://github.com/go-av/rtmp)) + ```go + import ( + "github.com/nareix/av" + ) + + func main() { + demuxer, _ := rtsp.Open("rtsp://xxooxo") + outfile, _ := os.Create("out.mp4") + muxer := mp4.Create(outfile, demuxer.Streams()) + av.CopyPackets(muxer, demuxer) + muxer.WriteTrailer() + outfile.Close() + } + ``` + + ​ + + +* Support MPEG-TS/MP4/FLV muxer/demuxer and RTSP client +* Support H264/AAC parser/encoder/decoder and transcode (cgo ffmpeg binding) + + +* high performance RTSP/RTMP/HLS/MPEG-DASH streaming server +* AV Database: store audio/video packets + + + +# 简单易用的 Golang 流媒体框架 + +* 简单易用的 API,一分钟内可以实现很多东西 + +* 支持 MPEG-TS/MP4/FLV 格式,以及 RTSP/RTMP 客户端 + +* 支持 H264/AAC 编解码以及转码(使用 cgo 调用 ffmpeg) + +* 支持输入输出设备 + +* 高性能 RTMP/FLV/HLS/MPEG-DASH 服务器,支持 OBS/ffmpeg 推流,支持 CDN 边缘节点模式 + +* 音视频数据库:支持录播/点播。支持分布式部署 + +* 与其他流行流媒体框架的对比以及适用场景 + * ffmpeg 是目前支持格式最全使用最广泛的编解码库,本框架以 Golang 的方式封装了 ffmpeg 的编解码部分,开发更方便 + + * live555 是 C++ 开发的流媒体框架,支持 RTSP,不支持新的流媒体协议,开发难度较大 + + * gstreamer 是 C 开发的流媒体框架,功能齐全,包含编解码/画面声音处理/转码。但是开发难度极大,对各插件的行为很难控制,需要学习过多的概念,除了直接调用命令行的简单应用之外,很难用在实际项目中 + + * nginx-rtmp 是基于 nginx 开发的 RTMP/HLS/MPEG-DASH 服务器,性能很强。但是它是一个现成的服务器不是一个库,很难利用它的代码进行二次开发 + + * av314 采用 Golang 语言编写,与 C/C++ 开发的其他框架相比,在保持高性能的同时没有 C/C++ 的各种问题,API 非常简单易用 + + * av314 支持最新的流媒体格式,对于旧的格式也能使用 ffmpeg 来支持 + + * av314 支持 Windows/Mac/Linux/树莓派 + + ​ + +示例: + +- 读取文件 +- 读取文件然后格式转换保存文件 +- 新建一个 RTSP Server 并在请求时候播放文件 +- 从 RTMP Client 读取流并转移保存到文件 +- RTMP 推流并建立 + + + +```go +dec := h264dec.New(CodecData) +dec.Write(nalu) +dec.ReadFrame() + +enc := h264enc.New() + +stream := NewStream(type, codecData) + +new := &av.Transcoder{ + R: demuxer, +} +new.FindEncoder = func(type av.CodecType, codecData []byte) { + if type == av.G722 { + return av.AAC, aacenc.New(codecData) + } +} +new.ReadPacket() +``` + + + +streams, _ := muxer.ReadHeader() \ No newline at end of file diff --git a/av.go b/av.go new file mode 100644 index 0000000..e242c21 --- /dev/null +++ b/av.go @@ -0,0 +1,163 @@ +package av + +type SampleFormat int + +const ( + U8 = SampleFormat(iota+1) + S16 + S32 + FLT + DBL + U8P + S16P + S32P + FLTP + DBLP + U32 +) + +func (self SampleFormat) BytesPerSample() int { + switch self { + case U8,U8P: + return 1 + case S16,S16P: + return 2 + case FLT,FLTP,S32,S32P,U32: + return 4 + case DBL,DBLP: + return 8 + default: + return 0 + } +} + +func (self SampleFormat) IsPlanar() bool { + switch self { + case S16P,S32P,FLTP,DBLP: + return true + default: + return false + } +} + +const ( + H264 = 0x264 + AAC = 0xaac +) + +type CodecData interface { + IsVideo() bool + IsAudio() bool + Type() int +} + +type VideoCodecData interface { + CodecData + Width() int + Height() int +} + +type AudioCodecData interface { + CodecData + SampleFormat() SampleFormat + SampleRate() int + ChannelCount() int +} + +type H264CodecData interface { + VideoCodecData + AVCDecoderConfRecordBytes() []byte + SPS() []byte + PPS() []byte +} + +type AACCodecData interface { + AudioCodecData + MPEG4AudioConfigBytes() []byte + MakeADTSHeader(samples int, payloadLength int) []byte +} + +type Muxer interface { + WriteHeader([]CodecData) error + WritePacket(int, Packet) error + WriteTrailer() error +} + +type Demuxer interface { + ReadPacket() (int, Packet, error) + Duration() float64 + Streams() []CodecData +} + +type Packet struct { + IsKeyFrame bool + Data []byte + Duration float64 + CompositionTime float64 +} + +type AudioFrame struct { + SampleFormat SampleFormat + ChannelCount int + Bytes []byte +} + +type AudioEncoder interface { + CodecData() AudioCodecData + Encode(AudioFrame) ([]Packet, error) + Flush() ([]Packet, error) +} + +type AudioDecoder interface { + Decode(Packet) (AudioFrame, error) + Flush() (AudioFrame, error) +} + +/* +在写入数据包的时候必须严格按照 V-A-A-A-V-A-A-A-.... 顺序,所有包的时间都必须正确 +如果有误,跳过错的那一段 + +cli := rtsp.Open("xxoo") +cli = &av.TranscodeDemuxer{ + Demuxer: cli, + Transcoders: []Transcoder{ffmpeg.AudioTranscodeTo("aac")}, +} + + + +minidash.open('src', function(video) { +}) + +minidash.HandleConn(func(conn *minidash.Conn) { + conn.RequestSrc + muxer, err := conn.WriteHeader(cli.Streams()) + muxer.WritePacket() +}) + +怎样转码 +av.Transconder{ + TranscodeHeader(codecData) ok, codecData, error + TranscodePacket(Packet, flush) []Packet, error + FlushPacket() []Packet, error +} + +decoder := ffmpeg.FindAudioDecoder(AudioCodecData) +decoder := ffmpeg.FindAudioDecoderByName("aac", CodecData) + +av.DemuxTranscoder{ + Demuxer Demuxer + Transconders []Transconder +} +Streams() +ReadPacket() +ClearPacketCache() + +怎样混合多个Demuxer +DemuxerMixer{ + demuxer +} +demuxer.FilterStreams() +streams := demuxer.Streams() +streams[0] +*/ + diff --git a/pktqueue/pktqueue.go b/pktqueue/pktqueue.go new file mode 100644 index 0000000..372c6b6 --- /dev/null +++ b/pktqueue/pktqueue.go @@ -0,0 +1,84 @@ +package pktqueue + +import ( + "github.com/nareix/av" +) + +type timePacket struct { + time float64 + av.Packet +} + +type stream struct { + pkts []timePacket + lastDuration float64 +} + +func (self *stream) Read(flush bool) (ok bool, pkt timePacket) { + if len(self.pkts) > 1 { + ok = true + pkt := self.pkts[0] + pkt.Duration = self.pkts[1].time - self.pkts[0].time + self.lastDuration = pkt.Duration + self.pkts = self.pkts[1:] + } else if len(self.pkts) == 1 && flush { + ok = true + pkt := self.pkts[0] + pkt.Duration = self.lastDuration + self.pkts = self.pkts[1:] + } + return +} + +type Queue struct { + streams []*stream + Poll func() error + err error + time float64 +} + +func (self *Queue) CurrentTime() float64 { + return self.time +} + +func (self *Queue) Alloc(n int) { + self.streams = make([]*stream, n) +} + +func (self *Queue) Clear() { + self.Alloc(len(self.streams)) + self.time = 0.0 +} + +func (self *Queue) ReadPacket() (i int, pkt av.Packet, err error) { + for { + flush := self.err != nil + var tpkt timePacket + var ok bool + var stream *stream + for i, stream = range self.streams { + if ok, tpkt = stream.Read(flush); ok { + break + } + } + if ok { + pkt = tpkt.Packet + self.time = tpkt.time + return + } else { + if self.err == nil { + self.err = self.Poll() + } else { + err = self.err + return + } + } + } + return +} + +func (self *Queue) WriteTimePacket(i int, time float64, pkt av.Packet) { + stream := self.streams[i] + stream.pkts = append(stream.pkts, timePacket{Packet: pkt, time: time}) +} + diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..58462d0 --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,225 @@ +package proxy + +import ( + "github.com/nareix/av" + "hash/fnv" + "encoding/binary" + "sync/atomic" + "sync" + "io" + "fmt" +) + +func hashParams(params []interface{}) uint64 { + f := fnv.New64() + for _, p := range params { + if s, ok := p.(string); ok { + io.WriteString(f, s) + } else { + binary.Write(f, binary.LittleEndian, p) + } + } + return f.Sum64() +} + +type Publisher struct { + h uint64 + Params []interface{} + proxy *Proxy + streams []av.CodecData + closed bool + lock *sync.RWMutex + cond *sync.Cond + subscribersCount int32 + pkt struct { + av.Packet + i int + } + ondemand bool +} + +func newPublisher() *Publisher { + pub := &Publisher{} + pub.lock = &sync.RWMutex{} + pub.cond = sync.NewCond(pub.lock.RLocker()) + return pub +} + +func (self *Publisher) addSubscriber() (sub *Subscriber, err error) { + self.cond.L.Lock() + for len(self.streams) == 0 && !self.closed { + self.cond.Wait() + } + self.cond.L.Unlock() + + if self.closed { + err = fmt.Errorf("publisher closed") + return + } + + atomic.AddInt32(&self.subscribersCount, 1) + sub = &Subscriber{} + sub.pub = self + + return +} + +func (self *Publisher) WriteHeader(streams []av.CodecData) (err error) { + self.lock.Lock() + self.streams = streams + self.lock.Unlock() + self.cond.Broadcast() + return +} + +func (self *Publisher) WritePacket(i int, pkt av.Packet) (err error) { + var closed bool + + self.lock.Lock() + if !self.closed { + self.pkt.Packet = pkt + self.pkt.i = i + } else { + closed = true + } + self.lock.Unlock() + + if closed { + err = io.EOF + return + } + + self.cond.Broadcast() + return +} + +func (self *Publisher) Close() (err error) { + self.lock.Lock() + self.closed = true + self.lock.Unlock() + self.cond.Broadcast() + return +} + +func (self *Publisher) removeSubscriber() { + count := atomic.AddInt32(&self.subscribersCount, -1) + if count == 0 && self.ondemand { + self.Close() + } +} + +type Subscriber struct { + pub *Publisher +} + +func (self *Subscriber) Streams() (streams []av.CodecData) { + pub := self.pub + pub.lock.RLock() + streams = pub.streams + pub.lock.RUnlock() + return +} + +func (self *Subscriber) ReadPacket() (i int, pkt av.Packet, err error) { + pub := self.pub + cond := pub.cond + cond.L.Lock() + ppkt := &pub.pkt + if pub.closed { + ppkt = nil + } else{ + cond.Wait() + } + cond.L.Unlock() + + if ppkt == nil { + err = io.EOF + return + } + i, pkt = ppkt.i, ppkt.Packet + return +} + +func (self *Subscriber) Close() (err error) { + if self.pub != nil { + self.pub.removeSubscriber() + self.pub = nil + } + return +} + +type Proxy struct { + publishers map[uint64]*Publisher + lock *sync.RWMutex + onSubscribe func(*Publisher) +} + +func New() *Proxy { + proxy := &Proxy{} + proxy.lock = &sync.RWMutex{} + proxy.publishers = make(map[uint64]*Publisher) + return proxy +} + +func (self *Proxy) HandleSubscribe(fn func(*Publisher)) { + self.onSubscribe = fn +} + +func (self *Proxy) Publish(params ...interface{}) (pub *Publisher, err error) { + h := hashParams(params) + + self.lock.Lock() + pub, exists := self.publishers[h] + if !exists { + pub = newPublisher() + pub.Params = params + pub.h = h + pub.proxy = self + self.publishers[h] = pub + } + self.lock.Unlock() + + if exists { + err = fmt.Errorf("publisher already exist") + return + } + + return +} + +func (self *Proxy) Subscribe(params ...interface{}) (sub *Subscriber, err error) { + h := hashParams(params) + needcb := false + + self.lock.RLock() + pub := self.publishers[h] + if pub == nil && self.onSubscribe != nil { + pub = newPublisher() + pub.Params = params + pub.h = h + pub.proxy = self + self.publishers[h] = pub + pub.ondemand = true + needcb = true + } + self.lock.RUnlock() + + if pub == nil { + err = fmt.Errorf("publisher not found") + return + } + + if needcb { + go func() { + self.onSubscribe(pub) + pub.Close() + }() + } + + if sub, err = pub.addSubscriber(); err != nil { + return + } + + return +} + diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go new file mode 100644 index 0000000..16f1c55 --- /dev/null +++ b/proxy/proxy_test.go @@ -0,0 +1,80 @@ +package proxy + +import ( + "testing" + "fmt" + "time" + "github.com/nareix/av" +) + +func TestProxy(t *testing.T) { + proxy := New() + pub, _ := proxy.Publish("abc") + pub.WriteHeader([]av.CodecData{nil, nil}) + + done := make(chan int) + + for n := 0; n < 3; n++ { + go func(n int) { + sub, _ := proxy.Subscribe("abc") + if sub == nil { + done <- n + return + } + for { + i, pkt, err := sub.ReadPacket() + if err != nil { + break + } + fmt.Println(i, pkt) + } + fmt.Println("close", n) + sub.Close() + done <- n + }(n) + } + + go func() { + pub.WritePacket(1, av.Packet{}) + pub.WritePacket(2, av.Packet{}) + pub.WritePacket(3, av.Packet{}) + if false { + time.Sleep(time.Second/100) + } + pub.Close() + done <- 4 + }() + + for i := 0; i < 4; i++ { + fmt.Println(<-done) + } + + fmt.Println("complete") + + done = make(chan int) + + proxy = New() + proxy.HandleSubscribe(func(pub *Publisher) { + fmt.Println(pub.Params) + pub.WriteHeader([]av.CodecData{nil, nil}) + for { + if err := pub.WritePacket(0, av.Packet{}); err != nil { + break + } + } + done <- 1 + }) + + subs := []*Subscriber{} + for i := 0; i < 3; i++ { + sub, _ := proxy.Subscribe("xxoo") + subs = append(subs, sub) + } + + for _, sub := range subs { + sub.Close() + } + + <-done +} + diff --git a/segment.go b/segment.go new file mode 100644 index 0000000..3336ee0 --- /dev/null +++ b/segment.go @@ -0,0 +1,100 @@ +package av + +import ( + "fmt" +) + +type PacketWithIdx struct { + Idx int + Packet +} + +type Segment struct { + Pkts []PacketWithIdx + duration float64 +} + +func (self Segment) Duration() float64 { + return self.duration +} + +func (self Segment) Concat(seg Segment) (out Segment) { + out.Pkts = append(self.Pkts, seg.Pkts...) + out.duration = self.duration+seg.duration + return +} + +func WriteSegment(muxer Muxer, seg Segment) (err error) { + for _, pkt := range seg.Pkts { + if err = muxer.WritePacket(pkt.Idx, pkt.Packet); err != nil { + return + } + } + return +} + +type SegmentReader struct { + Demuxer Demuxer + streams []CodecData + vi int + lastpkt *Packet +} + +func (self *SegmentReader) ClearCache() { + self.lastpkt = nil +} + +func (self *SegmentReader) prepare() (err error) { + self.streams = self.Demuxer.Streams() + self.vi = -1 + for i, stream := range self.streams { + if stream.IsVideo() { + self.vi = i + break + } + } + if self.vi == -1 { + err = fmt.Errorf("video stream not found") + return + } + return +} + +func (self *SegmentReader) ReadGop() (seg Segment, err error) { + if len(self.streams) == 0 { + if err = self.prepare(); err != nil { + return + } + } + + n := 0 + if self.lastpkt != nil { + n++ + seg.Pkts = append(seg.Pkts, PacketWithIdx{Idx:self.vi, Packet:*self.lastpkt}) + seg.duration += self.lastpkt.Duration + self.lastpkt = nil + } + + for { + var i int + var pkt Packet + if i, pkt, err = self.Demuxer.ReadPacket(); err != nil { + return + } + if i == self.vi && pkt.IsKeyFrame { + n++ + } + if n == 1 { + seg.Pkts = append(seg.Pkts, PacketWithIdx{Idx:i, Packet:pkt}) + if i == self.vi { + seg.duration += pkt.Duration + } + } else if n > 1 { + self.lastpkt = &pkt + break + } + } + + return +} + diff --git a/util.go b/util.go deleted file mode 100644 index 44c5636..0000000 --- a/util.go +++ /dev/null @@ -1,158 +0,0 @@ -package av - -import ( - "fmt" - "github.com/nareix/codec/aacparser" - "github.com/nareix/codec/h264parser" -) - -type CodecType int - -const ( - H264 = CodecType(1) - AAC = CodecType(2) -) - -func (self CodecType) String() string { - switch self { - case H264: - return "H264" - case AAC: - return "AAC" - } - return "?" -} - -func (self CodecType) IsAudio() bool { - return self == AAC -} - -func (self CodecType) IsVideo() bool { - return self == H264 -} - -type Stream interface { - IsVideo() bool - IsAudio() bool - Type() CodecType - SetType(CodecType) - CodecData() []byte - SetCodecData([]byte) error - SampleRate() int - ChannelCount() int - Width() int - Height() int - String() string - FillParamsByStream(Stream) error -} - -type StreamCommon struct { - codecType CodecType - codecData []byte - H264CodecInfo h264parser.CodecInfo - AACCodecInfo aacparser.CodecInfo -} - -type Muxer interface { - NewStream() Stream - WriteHeader() error - WriteTrailer() error - WritePacket(int, Packet) error - SetTime(float64) -} - -type Demuxer interface { - ReadHeader() error - ReadPacket() (int, Packet, error) - Streams() []Stream - Time() float64 -} - -type SeekableDemuxer interface { - Demuxer - SeekToTime(float64) error -} - -type Packet struct { - IsKeyFrame bool - Data []byte - Duration float64 - CompositionTime float64 -} - -func (self *StreamCommon) Type() CodecType { - return self.codecType -} - -func (self *StreamCommon) SetType(CodecType CodecType) { - self.codecType = CodecType -} - -func (self *StreamCommon) String() string { - str := self.codecType.String() - if self.IsAudio() { - str += fmt.Sprintf(" %dHz %dch", self.SampleRate(), self.ChannelCount()) - } else if self.IsVideo() { - str += fmt.Sprintf(" %dx%d", self.Width(), self.Height()) - } - return str -} - -func (self *StreamCommon) SetCodecData(data []byte) (err error) { - if self.codecType == H264 { - if self.H264CodecInfo, err = h264parser.ParseCodecData(data); err != nil { - return - } - } else if self.codecType == AAC { - if self.AACCodecInfo, err = aacparser.ParseCodecData(data); err != nil { - return - } - } else { - err = fmt.Errorf("unknown codec type=%d", self.codecType) - } - self.codecData = data - return -} - -func (self *StreamCommon) CodecData() (data []byte) { - return self.codecData -} - -func (self *StreamCommon) ChannelCount() int { - return self.AACCodecInfo.ChannelCount -} - -func (self *StreamCommon) SampleRate() int { - return self.AACCodecInfo.SampleRate -} - -func (self *StreamCommon) Width() int { - return int(self.H264CodecInfo.SPSInfo.Width) -} - -func (self *StreamCommon) Height() int { - return int(self.H264CodecInfo.SPSInfo.Height) -} - -func (self *StreamCommon) IsVideo() bool { - if self.codecType == H264 { - return true - } - return false -} - -func (self *StreamCommon) IsAudio() bool { - if self.codecType == AAC { - return true - } - return false -} - -func (self *StreamCommon) FillParamsByStream(other Stream) (err error) { - self.codecType = other.Type() - if err = self.SetCodecData(other.CodecData()); err != nil { - return - } - return -} -