rename to channels

This commit is contained in:
nareix 2016-06-06 22:15:51 +08:00
parent d1e2d3d2ff
commit df3ad04ce1
3 changed files with 218 additions and 40 deletions

View File

@ -1,13 +1,13 @@
package proxy
package channels
import (
"encoding/binary"
"fmt"
"github.com/nareix/av"
"hash/fnv"
"encoding/binary"
"sync/atomic"
"sync"
"io"
"fmt"
"sync"
"sync/atomic"
)
func hashParams(params []interface{}) (res uint64, err error) {
@ -28,7 +28,7 @@ func hashParams(params []interface{}) (res uint64, err error) {
type Publisher struct {
h uint64
Params []interface{}
proxy *Proxy
context *Context
streams []av.CodecData
closed bool
lock *sync.RWMutex
@ -151,24 +151,24 @@ func (self *Subscriber) Close() (err error) {
return
}
type Proxy struct {
type Context struct {
publishers map[uint64]*Publisher
lock *sync.RWMutex
onSubscribe func(*Publisher)
}
func New() *Proxy {
proxy := &Proxy{}
proxy.lock = &sync.RWMutex{}
proxy.publishers = make(map[uint64]*Publisher)
return proxy
func New() *Context {
context := &Context{}
context.lock = &sync.RWMutex{}
context.publishers = make(map[uint64]*Publisher)
return context
}
func (self *Proxy) HandleSubscribe(fn func(*Publisher)) {
func (self *Context) HandleSubscribe(fn func(*Publisher)) {
self.onSubscribe = fn
}
func (self *Proxy) Publish(params ...interface{}) (pub *Publisher, err error) {
func (self *Context) Publish(params ...interface{}) (pub *Publisher, err error) {
var h uint64
if h, err = hashParams(params); err != nil {
err = fmt.Errorf("please use string/int in Publish() params")
@ -181,7 +181,7 @@ func (self *Proxy) Publish(params ...interface{}) (pub *Publisher, err error) {
pub = newPublisher()
pub.Params = params
pub.h = h
pub.proxy = self
pub.context = self
self.publishers[h] = pub
}
self.lock.Unlock()
@ -194,7 +194,7 @@ func (self *Proxy) Publish(params ...interface{}) (pub *Publisher, err error) {
return
}
func (self *Proxy) Subscribe(params ...interface{}) (sub *Subscriber, err error) {
func (self *Context) Subscribe(params ...interface{}) (sub *Subscriber, err error) {
var h uint64
if h, err = hashParams(params); err != nil {
err = fmt.Errorf("please use string/int in Subscribe() params")
@ -208,7 +208,7 @@ func (self *Proxy) Subscribe(params ...interface{}) (sub *Subscriber, err error)
pub = newPublisher()
pub.Params = params
pub.h = h
pub.proxy = self
pub.context = self
self.publishers[h] = pub
pub.ondemand = true
needcb = true
@ -233,4 +233,3 @@ func (self *Proxy) Subscribe(params ...interface{}) (sub *Subscriber, err error)
return
}

View File

@ -1,22 +1,22 @@
package proxy
package channels
import (
"testing"
"fmt"
"time"
"github.com/nareix/av"
"testing"
"time"
)
func TestProxy(t *testing.T) {
proxy := New()
pub, _ := proxy.Publish("abc")
func TestChannels(t *testing.T) {
context := New()
pub, _ := context.Publish("abc")
pub.WriteHeader([]av.CodecData{nil, nil})
done := make(chan int)
for n := 0; n < 3; n++ {
go func(n int) {
sub, _ := proxy.Subscribe("abc")
sub, _ := context.Subscribe("abc")
if sub == nil {
done <- n
return
@ -53,8 +53,8 @@ func TestProxy(t *testing.T) {
done = make(chan int)
proxy = New()
proxy.HandleSubscribe(func(pub *Publisher) {
context = New()
context.HandleSubscribe(func(pub *Publisher) {
fmt.Println(pub.Params)
pub.WriteHeader([]av.CodecData{nil, nil})
for {
@ -67,7 +67,7 @@ func TestProxy(t *testing.T) {
subs := []*Subscriber{}
for i := 0; i < 3; i++ {
sub, _ := proxy.Subscribe("xxoo")
sub, _ := context.Subscribe("xxoo")
subs = append(subs, sub)
}
@ -77,4 +77,3 @@ func TestProxy(t *testing.T) {
<-done
}

180
transcode/transcode.go Normal file
View File

@ -0,0 +1,180 @@
package transcode
import (
"github.com/nareix/av"
"github.com/nareix/av/pktreorder"
)
type tstream struct {
av.CodecData
aenc av.AudioEncoder
adec av.AudioDecoder
}
type Transcoder struct {
FindAudioDecoderEncoder func(codec av.AudioCodecData) (ok bool, err error, dec av.AudioDecoder, enc av.AudioEncoder)
streams []*tstream
queue *pktreorder.Queue
}
func (self *Transcoder) Setup(streams []av.CodecData) (err error) {
for _, stream := range streams {
ts := &tstream{CodecData: stream}
if stream.IsAudio() {
if self.FindAudioDecoderEncoder != nil {
var ok bool
var enc av.AudioEncoder
var dec av.AudioDecoder
ok, err, dec, enc = self.FindAudioDecoderEncoder(stream.(av.AudioCodecData))
if ok {
if err != nil {
return
}
ts.CodecData = enc.CodecData()
ts.aenc = enc
ts.adec = dec
}
}
}
self.streams = append(self.streams, ts)
}
self.queue = &pktreorder.Queue{}
self.queue.Alloc(self.Streams())
return
}
func (self *Transcoder) WritePacket(i int, pkt av.Packet) (err error) {
stream := self.streams[i]
if stream.aenc != nil && stream.adec != nil {
var frame av.AudioFrame
var ok bool
if ok, frame, err = stream.adec.Decode(pkt.Data); err != nil {
return
}
if ok {
var pkts []av.Packet
if pkts, err = stream.aenc.Encode(frame); err != nil {
return
}
for _, pkt := range pkts {
self.queue.WritePacket(i, pkt)
}
}
}
return
}
func (self *Transcoder) EndWritePacket(err error) {
self.queue.EndWritePacket(err)
}
func (self *Transcoder) CanReadPacket() bool {
return self.queue.CanReadPacket()
}
func (self *Transcoder) ReadPacket() (i int, pkt av.Packet, err error) {
return self.queue.ReadPacket()
}
func (self *Transcoder) Streams() (streams []av.CodecData) {
for _, stream := range self.streams {
streams = append(streams, stream.CodecData)
}
return
}
func (self *Transcoder) Close() {
for _, stream := range self.streams {
if stream.aenc != nil {
stream.aenc.Close()
}
if stream.adec != nil {
stream.adec.Close()
}
}
self.streams = []*tstream{}
}
type Muxer struct {
Muxer av.Muxer
Transcoder *Transcoder
}
func (self *Muxer) WriteHeader(streams []av.CodecData) (err error) {
if err = self.Transcoder.Setup(streams); err != nil {
return
}
if err = self.Muxer.WriteHeader(self.Transcoder.Streams()); err != nil {
return
}
return
}
func (self *Muxer) WritePacket(i int, pkt av.Packet) (err error) {
self.Transcoder.WritePacket(i, pkt)
if self.Transcoder.CanReadPacket() {
if i, pkt, rerr := self.Transcoder.ReadPacket(); rerr != nil {
err = rerr
return
} else {
if werr := self.Muxer.WritePacket(i, pkt); werr != nil {
self.Transcoder.EndWritePacket(werr)
}
}
}
return
}
func (self *Muxer) WriteTrailer() (err error) {
self.Transcoder.EndWritePacket(nil)
for {
if i, pkt, rerr := self.Transcoder.ReadPacket(); rerr != nil {
break
} else {
if werr := self.Muxer.WritePacket(i, pkt); werr != nil {
err = werr
return
}
}
}
if err = self.Muxer.WriteTrailer(); err != nil {
return
}
return
}
type Demuxer struct {
Demuxer av.Demuxer
Transcoder *Transcoder
}
func (self *Demuxer) Setup() (err error) {
if err = self.Transcoder.Setup(self.Demuxer.Streams()); err != nil {
return
}
return
}
func (self *Demuxer) ReadPacket() (i int, pkt av.Packet, err error) {
for {
if self.Transcoder.CanReadPacket() {
return self.Transcoder.ReadPacket()
} else {
if i, pkt, err := self.Demuxer.ReadPacket(); err != nil {
self.Transcoder.EndWritePacket(err)
} else {
self.Transcoder.WritePacket(i, pkt)
}
}
}
}
func (self *Demuxer) Streams() []av.CodecData {
return self.Transcoder.Streams()
}
func (self *Demuxer) Close() {
self.Transcoder.Close()
}