Skip to content

Commit 5f3f042

Browse files
committed
Make new GCC compatible with old API
1 parent 101c527 commit 5f3f042

File tree

2 files changed

+140
-6
lines changed

2 files changed

+140
-6
lines changed

pkg/bwe/deprecated_bwe_api.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package bwe
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"time"
7+
8+
"github.com/pion/interceptor"
9+
"github.com/pion/interceptor/pkg/cc"
10+
"github.com/pion/interceptor/pkg/ccfb"
11+
"github.com/pion/rtcp"
12+
)
13+
14+
// GCCFactory creates a new cc.BandwidthEstimator
15+
func GCCFactory() (cc.BandwidthEstimator, error) {
16+
return &GCC{
17+
lock: sync.Mutex{},
18+
sbwe: NewSendSideController(1_000_000, 100_000, 100_000_000),
19+
rate: 1_000_000,
20+
}, nil
21+
}
22+
23+
// GCC implements cc.BandwidthEstimator
24+
type GCC struct {
25+
lock sync.Mutex
26+
sbwe *SendSideController
27+
rate int
28+
updateCB func(int)
29+
}
30+
31+
// AddStream implements cc.BandwidthEstimator.
32+
// Called by cc.Interceptor
33+
func (g *GCC) AddStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
34+
return writer
35+
}
36+
37+
// Close implements cc.BandwidthEstimator.
38+
// Called by cc.Interceptor
39+
func (g *GCC) Close() error {
40+
// GCC does not need to be closed
41+
return nil
42+
}
43+
44+
// GetStats implements cc.BandwidthEstimator.
45+
// Called by application
46+
func (g *GCC) GetStats() map[string]interface{} {
47+
g.lock.Lock()
48+
defer g.lock.Unlock()
49+
return map[string]interface{}{
50+
"warning": "GetStats is deprecated",
51+
"lossTargetBitrate": 0,
52+
"averageLoss": 0,
53+
"delayTargetBitrate": 0,
54+
"delayMeasurement": 0,
55+
"delayEstimate": 0,
56+
"delayThreshold": 0,
57+
"usage": 0,
58+
"state": 0,
59+
}
60+
}
61+
62+
// GetTargetBitrate implements cc.BandwidthEstimator.
63+
// Called by application
64+
func (g *GCC) GetTargetBitrate() int {
65+
g.lock.Lock()
66+
defer g.lock.Unlock()
67+
return g.rate
68+
}
69+
70+
// OnTargetBitrateChange implements cc.BandwidthEstimator.
71+
// Called by application
72+
func (g *GCC) OnTargetBitrateChange(f func(bitrate int)) {
73+
g.lock.Lock()
74+
defer g.lock.Unlock()
75+
g.updateCB = f
76+
}
77+
78+
// WriteRTCP implements cc.BandwidthEstimator.
79+
// Called by cc.Interceptor
80+
func (g *GCC) WriteRTCP(_ []rtcp.Packet, attr interceptor.Attributes) error {
81+
reports, ok := attr.Get(ccfb.CCFBAttributesKey).([]ccfb.Report)
82+
if !ok {
83+
return errors.New("warning: GCC requires CCFB interceptor to be configured before the CC interceptor")
84+
}
85+
now := time.Now()
86+
for _, report := range reports {
87+
acks, rtt := readReport(report)
88+
g.update(now, rtt, acks)
89+
}
90+
return nil
91+
}
92+
93+
func (g *GCC) update(now time.Time, rtt time.Duration, acks []Acknowledgment) {
94+
g.lock.Lock()
95+
defer g.lock.Unlock()
96+
oldRate := g.rate
97+
98+
g.rate = g.sbwe.OnAcks(now, rtt, acks)
99+
100+
if oldRate != g.rate && g.updateCB != nil {
101+
g.updateCB(g.rate)
102+
}
103+
}
104+
105+
func readReport(report ccfb.Report) ([]Acknowledgment, time.Duration) {
106+
acks := []Acknowledgment{}
107+
latestAcked := Acknowledgment{}
108+
for _, prs := range report.SSRCToPacketReports {
109+
for _, pr := range prs {
110+
ack := Acknowledgment{
111+
SeqNr: pr.SeqNr,
112+
Size: pr.Size,
113+
Departure: pr.Departure,
114+
Arrived: pr.Arrived,
115+
Arrival: pr.Arrival,
116+
ECN: ECN(pr.ECN),
117+
}
118+
if ack.Arrival.After(latestAcked.Arrival) {
119+
latestAcked = ack
120+
}
121+
acks = append(acks, ack)
122+
}
123+
}
124+
rtt := MeasureRTT(report.Departure, report.Arrival, latestAcked.Departure, latestAcked.Arrival)
125+
return acks, rtt
126+
}

pkg/cc/interceptor.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package cc
77

88
import (
99
"github.com/pion/interceptor"
10+
"github.com/pion/interceptor/pkg/ccfb"
1011
"github.com/pion/interceptor/pkg/gcc"
1112
"github.com/pion/rtcp"
1213
)
@@ -38,6 +39,7 @@ type InterceptorFactory struct {
3839
opts []Option
3940
bweFactory func() (BandwidthEstimator, error)
4041
addPeerConnection NewPeerConnectionCallback
42+
ccfbFactory *ccfb.InterceptorFactory
4143
}
4244

4345
// NewInterceptor returns a new CC interceptor factory
@@ -47,10 +49,15 @@ func NewInterceptor(factory BandwidthEstimatorFactory, opts ...Option) (*Interce
4749
return gcc.NewSendSideBWE()
4850
}
4951
}
52+
ccfbFactory, err := ccfb.NewInterceptor()
53+
if err != nil {
54+
return nil, err
55+
}
5056
return &InterceptorFactory{
5157
opts: opts,
5258
bweFactory: factory,
5359
addPeerConnection: nil,
60+
ccfbFactory: ccfbFactory,
5461
}, nil
5562
}
5663

@@ -69,28 +76,29 @@ func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor,
6976
i := &Interceptor{
7077
NoOp: interceptor.NoOp{},
7178
estimator: bwe,
72-
feedback: make(chan []rtcp.Packet),
73-
close: make(chan struct{}),
7479
}
7580

7681
for _, opt := range f.opts {
77-
if err := opt(i); err != nil {
82+
if err = opt(i); err != nil {
7883
return nil, err
7984
}
8085
}
8186

8287
if f.addPeerConnection != nil {
8388
f.addPeerConnection(id, i.estimator)
8489
}
85-
return i, nil
90+
91+
ccfb, err := f.ccfbFactory.NewInterceptor(id)
92+
if err != nil {
93+
return nil, err
94+
}
95+
return interceptor.NewChain([]interceptor.Interceptor{ccfb, i}), nil
8696
}
8797

8898
// Interceptor implements Google Congestion Control
8999
type Interceptor struct {
90100
interceptor.NoOp
91101
estimator BandwidthEstimator
92-
feedback chan []rtcp.Packet
93-
close chan struct{}
94102
}
95103

96104
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once

0 commit comments

Comments
 (0)