From df3ad04ce1419b6d5b888eff6713a0d85d3ff5ca Mon Sep 17 00:00:00 2001 From: nareix Date: Mon, 6 Jun 2016 22:15:51 +0800 Subject: [PATCH] rename to channels --- proxy/proxy.go => channels/channels.go | 55 +++--- .../channels_test.go | 23 ++- transcode/transcode.go | 180 ++++++++++++++++++ 3 files changed, 218 insertions(+), 40 deletions(-) rename proxy/proxy.go => channels/channels.go (81%) rename proxy/proxy_test.go => channels/channels_test.go (77%) create mode 100644 transcode/transcode.go diff --git a/proxy/proxy.go b/channels/channels.go similarity index 81% rename from proxy/proxy.go rename to channels/channels.go index d256b86..85909bd 100644 --- a/proxy/proxy.go +++ b/channels/channels.go @@ -1,13 +1,13 @@ -package proxy +package channels import ( + "encoding/binary" + "fmt" "github.com/nareix/av" "hash/fnv" - "encoding/binary" - "sync/atomic" - "sync" "io" - "fmt" + "sync" + "sync/atomic" ) func hashParams(params []interface{}) (res uint64, err error) { @@ -26,15 +26,15 @@ func hashParams(params []interface{}) (res uint64, err error) { } type Publisher struct { - h uint64 - Params []interface{} - proxy *Proxy - streams []av.CodecData - closed bool - lock *sync.RWMutex - cond *sync.Cond + h uint64 + Params []interface{} + context *Context + streams []av.CodecData + closed bool + lock *sync.RWMutex + cond *sync.Cond subscribersCount int32 - pkt struct { + pkt struct { av.Packet i int } @@ -130,7 +130,7 @@ func (self *Subscriber) ReadPacket() (i int, pkt av.Packet, err error) { ppkt := &pub.pkt if pub.closed { ppkt = nil - } else{ + } else { cond.Wait() } cond.L.Unlock() @@ -151,24 +151,24 @@ func (self *Subscriber) Close() (err error) { return } -type Proxy struct { - publishers map[uint64]*Publisher - lock *sync.RWMutex +type Context struct { + publishers map[uint64]*Publisher + lock *sync.RWMutex onSubscribe func(*Publisher) } -func New() *Proxy { - proxy := &Proxy{} - proxy.lock = &sync.RWMutex{} - proxy.publishers = make(map[uint64]*Publisher) - return proxy +func New() *Context { + context := &Context{} + context.lock = &sync.RWMutex{} + context.publishers = make(map[uint64]*Publisher) + return context } -func (self *Proxy) HandleSubscribe(fn func(*Publisher)) { +func (self *Context) HandleSubscribe(fn func(*Publisher)) { self.onSubscribe = fn } -func (self *Proxy) Publish(params ...interface{}) (pub *Publisher, err error) { +func (self *Context) Publish(params ...interface{}) (pub *Publisher, err error) { var h uint64 if h, err = hashParams(params); err != nil { err = fmt.Errorf("please use string/int in Publish() params") @@ -181,7 +181,7 @@ func (self *Proxy) Publish(params ...interface{}) (pub *Publisher, err error) { pub = newPublisher() pub.Params = params pub.h = h - pub.proxy = self + pub.context = self self.publishers[h] = pub } self.lock.Unlock() @@ -194,7 +194,7 @@ func (self *Proxy) Publish(params ...interface{}) (pub *Publisher, err error) { return } -func (self *Proxy) Subscribe(params ...interface{}) (sub *Subscriber, err error) { +func (self *Context) Subscribe(params ...interface{}) (sub *Subscriber, err error) { var h uint64 if h, err = hashParams(params); err != nil { err = fmt.Errorf("please use string/int in Subscribe() params") @@ -208,7 +208,7 @@ func (self *Proxy) Subscribe(params ...interface{}) (sub *Subscriber, err error) pub = newPublisher() pub.Params = params pub.h = h - pub.proxy = self + pub.context = self self.publishers[h] = pub pub.ondemand = true needcb = true @@ -233,4 +233,3 @@ func (self *Proxy) Subscribe(params ...interface{}) (sub *Subscriber, err error) return } - diff --git a/proxy/proxy_test.go b/channels/channels_test.go similarity index 77% rename from proxy/proxy_test.go rename to channels/channels_test.go index 16f1c55..13c527d 100644 --- a/proxy/proxy_test.go +++ b/channels/channels_test.go @@ -1,22 +1,22 @@ -package proxy +package channels import ( - "testing" "fmt" - "time" "github.com/nareix/av" + "testing" + "time" ) -func TestProxy(t *testing.T) { - proxy := New() - pub, _ := proxy.Publish("abc") +func TestChannels(t *testing.T) { + context := New() + pub, _ := context.Publish("abc") pub.WriteHeader([]av.CodecData{nil, nil}) done := make(chan int) for n := 0; n < 3; n++ { go func(n int) { - sub, _ := proxy.Subscribe("abc") + sub, _ := context.Subscribe("abc") if sub == nil { done <- n return @@ -39,7 +39,7 @@ func TestProxy(t *testing.T) { pub.WritePacket(2, av.Packet{}) pub.WritePacket(3, av.Packet{}) if false { - time.Sleep(time.Second/100) + time.Sleep(time.Second / 100) } pub.Close() done <- 4 @@ -53,8 +53,8 @@ func TestProxy(t *testing.T) { done = make(chan int) - proxy = New() - proxy.HandleSubscribe(func(pub *Publisher) { + context = New() + context.HandleSubscribe(func(pub *Publisher) { fmt.Println(pub.Params) pub.WriteHeader([]av.CodecData{nil, nil}) for { @@ -67,7 +67,7 @@ func TestProxy(t *testing.T) { subs := []*Subscriber{} for i := 0; i < 3; i++ { - sub, _ := proxy.Subscribe("xxoo") + sub, _ := context.Subscribe("xxoo") subs = append(subs, sub) } @@ -77,4 +77,3 @@ func TestProxy(t *testing.T) { <-done } - diff --git a/transcode/transcode.go b/transcode/transcode.go new file mode 100644 index 0000000..5a3fb5d --- /dev/null +++ b/transcode/transcode.go @@ -0,0 +1,180 @@ +package transcode + +import ( + "github.com/nareix/av" + "github.com/nareix/av/pktreorder" +) + +type tstream struct { + av.CodecData + aenc av.AudioEncoder + adec av.AudioDecoder +} + +type Transcoder struct { + FindAudioDecoderEncoder func(codec av.AudioCodecData) (ok bool, err error, dec av.AudioDecoder, enc av.AudioEncoder) + streams []*tstream + queue *pktreorder.Queue +} + +func (self *Transcoder) Setup(streams []av.CodecData) (err error) { + for _, stream := range streams { + ts := &tstream{CodecData: stream} + if stream.IsAudio() { + if self.FindAudioDecoderEncoder != nil { + var ok bool + var enc av.AudioEncoder + var dec av.AudioDecoder + ok, err, dec, enc = self.FindAudioDecoderEncoder(stream.(av.AudioCodecData)) + if ok { + if err != nil { + return + } + ts.CodecData = enc.CodecData() + ts.aenc = enc + ts.adec = dec + } + } + } + self.streams = append(self.streams, ts) + } + + self.queue = &pktreorder.Queue{} + self.queue.Alloc(self.Streams()) + return +} + +func (self *Transcoder) WritePacket(i int, pkt av.Packet) (err error) { + stream := self.streams[i] + if stream.aenc != nil && stream.adec != nil { + var frame av.AudioFrame + var ok bool + if ok, frame, err = stream.adec.Decode(pkt.Data); err != nil { + return + } + if ok { + var pkts []av.Packet + if pkts, err = stream.aenc.Encode(frame); err != nil { + return + } + for _, pkt := range pkts { + self.queue.WritePacket(i, pkt) + } + } + } + return +} + +func (self *Transcoder) EndWritePacket(err error) { + self.queue.EndWritePacket(err) +} + +func (self *Transcoder) CanReadPacket() bool { + return self.queue.CanReadPacket() +} + +func (self *Transcoder) ReadPacket() (i int, pkt av.Packet, err error) { + return self.queue.ReadPacket() +} + +func (self *Transcoder) Streams() (streams []av.CodecData) { + for _, stream := range self.streams { + streams = append(streams, stream.CodecData) + } + return +} + +func (self *Transcoder) Close() { + for _, stream := range self.streams { + if stream.aenc != nil { + stream.aenc.Close() + } + if stream.adec != nil { + stream.adec.Close() + } + } + self.streams = []*tstream{} +} + +type Muxer struct { + Muxer av.Muxer + Transcoder *Transcoder +} + +func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) { + if err = self.Transcoder.Setup(streams); err != nil { + return + } + if err = self.Muxer.WriteHeader(self.Transcoder.Streams()); err != nil { + return + } + return +} + +func (self *Muxer) WritePacket(i int, pkt av.Packet) (err error) { + self.Transcoder.WritePacket(i, pkt) + if self.Transcoder.CanReadPacket() { + if i, pkt, rerr := self.Transcoder.ReadPacket(); rerr != nil { + err = rerr + return + } else { + if werr := self.Muxer.WritePacket(i, pkt); werr != nil { + self.Transcoder.EndWritePacket(werr) + } + } + } + return +} + +func (self *Muxer) WriteTrailer() (err error) { + self.Transcoder.EndWritePacket(nil) + for { + if i, pkt, rerr := self.Transcoder.ReadPacket(); rerr != nil { + break + } else { + if werr := self.Muxer.WritePacket(i, pkt); werr != nil { + err = werr + return + } + } + } + if err = self.Muxer.WriteTrailer(); err != nil { + return + } + return +} + +type Demuxer struct { + Demuxer av.Demuxer + Transcoder *Transcoder +} + +func (self *Demuxer) Setup() (err error) { + if err = self.Transcoder.Setup(self.Demuxer.Streams()); err != nil { + return + } + return +} + +func (self *Demuxer) ReadPacket() (i int, pkt av.Packet, err error) { + for { + if self.Transcoder.CanReadPacket() { + return self.Transcoder.ReadPacket() + } else { + if i, pkt, err := self.Demuxer.ReadPacket(); err != nil { + self.Transcoder.EndWritePacket(err) + } else { + self.Transcoder.WritePacket(i, pkt) + } + } + } +} + +func (self *Demuxer) Streams() []av.CodecData { + return self.Transcoder.Streams() +} + +func (self *Demuxer) Close() { + self.Transcoder.Close() +} +