Skip to content

Commit 5f3ebf8

Browse files
authored
SDK timing (#910)
* fix eos timing, appsrc EOF * fix onEOSSent
1 parent 67b17e3 commit 5f3ebf8

File tree

7 files changed

+38
-46
lines changed

7 files changed

+38
-46
lines changed

pkg/config/pipeline.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ import (
3737
lksdk "github.com/livekit/server-sdk-go/v2"
3838
)
3939

40-
const Latency = uint64(3e9)
40+
const (
41+
JitterBufferLatency = time.Second * 2
42+
AudioMixerLatency = uint64(25e8)
43+
PipelineLatency = uint64(3e9)
44+
AppSrcDrainTimeout = time.Millisecond * 3500
45+
)
4146

4247
type PipelineConfig struct {
4348
BaseConfig `yaml:",inline"`

pkg/pipeline/builder/audio.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ import (
2929
)
3030

3131
const (
32-
audioMixerLatency = uint64(25e8)
33-
audioChannelNone = 0
32+
audioChannelStereo = 0
33+
audioChannelLeft = 1
34+
audioChannelRight = 2
3435
)
3536

3637
type AudioBin struct {
@@ -74,7 +75,7 @@ func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
7475
return err
7576
}
7677
} else {
77-
queue, err := gstreamer.BuildQueue("audio_queue", config.Latency, true)
78+
queue, err := gstreamer.BuildQueue("audio_queue", config.PipelineLatency, true)
7879
if err != nil {
7980
return errors.ErrGstPipelineError(err)
8081
}
@@ -129,7 +130,7 @@ func (b *AudioBin) buildWebInput() error {
129130
return err
130131
}
131132

132-
if err = addAudioConverter(b.bin, b.conf, audioChannelNone); err != nil {
133+
if err = addAudioConverter(b.bin, b.conf, audioChannelStereo); err != nil {
133134
return err
134135
}
135136
if b.conf.AudioTranscoding {
@@ -223,13 +224,13 @@ func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error {
223224
func (b *AudioBin) getChannel(ts *config.TrackSource) int {
224225
switch b.conf.AudioMixing {
225226
case livekit.AudioMixing_DEFAULT_MIXING:
226-
return 0
227+
return audioChannelStereo
227228

228229
case livekit.AudioMixing_DUAL_CHANNEL_AGENT:
229230
if ts.ParticipantKind == lksdk.ParticipantAgent {
230-
return 1
231+
return audioChannelLeft
231232
} else {
232-
return 2
233+
return audioChannelRight
233234
}
234235

235236
case livekit.AudioMixing_DUAL_CHANNEL_ALTERNATE:
@@ -238,7 +239,7 @@ func (b *AudioBin) getChannel(ts *config.TrackSource) int {
238239
return next%2 + 1
239240
}
240241

241-
return 0
242+
return audioChannelStereo
242243
}
243244

244245
func (b *AudioBin) addAudioTestSrcBin() error {
@@ -261,7 +262,7 @@ func (b *AudioBin) addAudioTestSrcBin() error {
261262
return errors.ErrGstPipelineError(err)
262263
}
263264

264-
audioCaps, err := newAudioCapsFilter(b.conf, audioChannelNone)
265+
audioCaps, err := newAudioCapsFilter(b.conf, audioChannelStereo)
265266
if err != nil {
266267
return err
267268
}
@@ -274,11 +275,11 @@ func (b *AudioBin) addMixer() error {
274275
if err != nil {
275276
return errors.ErrGstPipelineError(err)
276277
}
277-
if err = audioMixer.SetProperty("latency", audioMixerLatency); err != nil {
278+
if err = audioMixer.SetProperty("latency", config.AudioMixerLatency); err != nil {
278279
return errors.ErrGstPipelineError(err)
279280
}
280281

281-
mixedCaps, err := newAudioCapsFilter(b.conf, audioChannelNone)
282+
mixedCaps, err := newAudioCapsFilter(b.conf, audioChannelStereo)
282283
if err != nil {
283284
return err
284285
}
@@ -317,7 +318,7 @@ func (b *AudioBin) addEncoder() error {
317318
}
318319

319320
func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig, channel int) error {
320-
audioQueue, err := gstreamer.BuildQueue("audio_input_queue", config.Latency, true)
321+
audioQueue, err := gstreamer.BuildQueue("audio_input_queue", config.PipelineLatency, true)
321322
if err != nil {
322323
return err
323324
}
@@ -342,7 +343,7 @@ func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig, channel int)
342343

343344
func newAudioCapsFilter(p *config.PipelineConfig, channel int) (*gst.Element, error) {
344345
var channelCaps string
345-
if channel == audioChannelNone {
346+
if channel == audioChannelStereo {
346347
channelCaps = "channels=2"
347348
} else {
348349
channelCaps = fmt.Sprintf("channels=1,channel-mask=(bitmask)0x%d", channel)

pkg/pipeline/builder/stream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, o *config.StreamConfig) (*Stre
6868
return nil, errors.ErrGstPipelineError(err)
6969
}
7070
// add latency to give time for flvmux to receive and order packets from both streams
71-
if err = mux.SetProperty("latency", config.Latency); err != nil {
71+
if err = mux.SetProperty("latency", config.PipelineLatency); err != nil {
7272
return nil, errors.ErrGstPipelineError(err)
7373
}
7474

@@ -113,7 +113,7 @@ func (sb *StreamBin) BuildStream(stream *config.Stream, framerate int32) (*Strea
113113
stream.Name = utils.NewGuid("")
114114
b := sb.Bin.NewBin(stream.Name)
115115

116-
queue, err := gstreamer.BuildQueue(fmt.Sprintf("queue_%s", stream.Name), config.Latency, true)
116+
queue, err := gstreamer.BuildQueue(fmt.Sprintf("queue_%s", stream.Name), config.PipelineLatency, true)
117117
if err != nil {
118118
return nil, errors.ErrGstPipelineError(err)
119119
}

pkg/pipeline/builder/video.go

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
8686
return tee.GetRequestPad("src_%u")
8787
}
8888
} else if len(p.GetEncodedOutputs()) > 0 {
89-
queue, err := gstreamer.BuildQueue("video_queue", config.Latency, true)
89+
queue, err := gstreamer.BuildQueue("video_queue", config.PipelineLatency, true)
9090
if err != nil {
9191
return errors.ErrGstPipelineError(err)
9292
}
@@ -199,7 +199,7 @@ func (b *VideoBin) buildWebInput() error {
199199
return errors.ErrGstPipelineError(err)
200200
}
201201

202-
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
202+
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.PipelineLatency, true)
203203
if err != nil {
204204
return errors.ErrGstPipelineError(err)
205205
}
@@ -259,17 +259,6 @@ func (b *VideoBin) buildSDKInput() error {
259259

260260
if b.conf.VideoDecoding {
261261
b.bin.SetGetSrcPad(b.getSrcPad)
262-
b.bin.SetEOSFunc(func() bool {
263-
b.mu.Lock()
264-
selected := b.selectedPad
265-
pad := b.pads[videoTestSrcName]
266-
b.mu.Unlock()
267-
268-
if selected == videoTestSrcName {
269-
pad.SendEvent(gst.NewEOSEvent())
270-
}
271-
return false
272-
})
273262

274263
if err := b.addVideoTestSrcBin(); err != nil {
275264
return err
@@ -313,6 +302,9 @@ func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error {
313302

314303
func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource, name string) (*gstreamer.Bin, error) {
315304
appSrcBin := b.bin.NewBin(name)
305+
appSrcBin.SetEOSFunc(func() bool {
306+
return false
307+
})
316308
ts.AppSrc.Element.SetArg("format", "time")
317309
if err := ts.AppSrc.Element.SetProperty("is-live", true); err != nil {
318310
return nil, errors.ErrGstPipelineError(err)
@@ -472,7 +464,7 @@ func (b *VideoBin) addVideoTestSrcBin() error {
472464
}
473465
videoTestSrc.SetArg("pattern", "black")
474466

475-
queue, err := gstreamer.BuildQueue("video_test_src_queue", config.Latency, false)
467+
queue, err := gstreamer.BuildQueue("video_test_src_queue", config.PipelineLatency, false)
476468
if err != nil {
477469
return err
478470
}
@@ -521,7 +513,7 @@ func (b *VideoBin) addSelector() error {
521513
}
522514

523515
func (b *VideoBin) addEncoder() error {
524-
videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", config.Latency, false)
516+
videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", config.PipelineLatency, false)
525517
if err != nil {
526518
return errors.ErrGstPipelineError(err)
527519
}
@@ -648,7 +640,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
648640
}
649641

650642
func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
651-
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
643+
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.PipelineLatency, true)
652644
if err != nil {
653645
return errors.ErrGstPipelineError(err)
654646
}

pkg/pipeline/controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.
113113
}
114114

115115
func (c *Controller) BuildPipeline() error {
116-
p, err := gstreamer.NewPipeline(pipelineName, config.Latency, c.callbacks)
116+
p, err := gstreamer.NewPipeline(pipelineName, config.PipelineLatency, c.callbacks)
117117
if err != nil {
118118
return errors.ErrGstPipelineError(err)
119119
}
@@ -334,7 +334,7 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st
334334
func (c *Controller) onEOSSent() {
335335
// for video-only track/track composite, EOS might have already
336336
// made it through the pipeline by the time endRecording is closed
337-
if c.SourceType == types.SourceTypeSDK && !c.AudioEnabled {
337+
if (c.RequestType == types.RequestTypeTrack || c.RequestType == types.RequestTypeTrackComposite) && !c.AudioEnabled {
338338
// this will not actually send a second EOS, but will make sure everything is in the correct state
339339
c.SendEOS(context.Background(), livekit.EndReasonSrcClosed)
340340
}

pkg/pipeline/source/sdk.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,13 @@ func (s *SDKSource) CloseWriters() {
122122
s.closed.Once(func() {
123123
s.sync.End()
124124

125-
var wg sync.WaitGroup
126125
s.mu.Lock()
127-
wg.Add(len(s.writers))
128126
for _, w := range s.writers {
129127
go func(writer *sdk.AppWriter) {
130-
defer wg.Done()
131128
writer.Drain(false)
132129
}(w)
133130
}
134131
s.mu.Unlock()
135-
wg.Wait()
136132
})
137133
}
138134

pkg/pipeline/source/sdk/appwriter.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ import (
3939
)
4040

4141
const (
42-
latency = time.Second * 2
43-
drainTimeout = time.Millisecond * 3500
4442
errBufferTooSmall = "buffer too small"
4543
)
4644

@@ -139,7 +137,7 @@ func NewAppWriter(
139137

140138
w.buffer = jitter.NewBuffer(
141139
depacketizer,
142-
latency,
140+
config.JitterBufferLatency,
143141
w.samples,
144142
jitter.WithLogger(w.logger),
145143
jitter.WithPacketLossHandler(w.sendPLI),
@@ -178,8 +176,6 @@ func (w *AppWriter) start() {
178176
}
179177
}
180178

181-
w.draining.Break()
182-
183179
w.logger.Infow("writer finished")
184180
if w.csvLogger != nil {
185181
w.csvLogger.Close()
@@ -231,7 +227,7 @@ func (w *AppWriter) handleReadError(err error) {
231227
if lastRecv.IsZero() {
232228
lastRecv = w.startTime
233229
}
234-
if w.pub.IsMuted() || time.Since(lastRecv) > latency {
230+
if w.pub.IsMuted() || time.Since(lastRecv) > config.JitterBufferLatency {
235231
// set track inactive
236232
w.logger.Debugw("track inactive", "timestamp", time.Since(w.startTime))
237233
w.active.Store(false)
@@ -247,6 +243,7 @@ func (w *AppWriter) handleReadError(err error) {
247243
if !errors.Is(err, io.EOF) {
248244
w.logger.Errorw("could not read packet", err)
249245
}
246+
w.draining.Break()
250247
w.endStream.Break()
251248
}
252249
}
@@ -267,7 +264,8 @@ func (w *AppWriter) pushSamples() {
267264
case sample := <-w.samples:
268265
for _, pkt := range sample {
269266
if err := w.pushPacket(pkt); err != nil {
270-
w.draining.Once(func() { w.endStream.Break() })
267+
w.draining.Break()
268+
w.endStream.Break()
271269
}
272270
}
273271
}
@@ -310,7 +308,7 @@ func (w *AppWriter) Drain(force bool) {
310308
if force || !w.active.Load() {
311309
w.endStream.Break()
312310
} else {
313-
time.AfterFunc(drainTimeout, func() { w.endStream.Break() })
311+
time.AfterFunc(config.AppSrcDrainTimeout, func() { w.endStream.Break() })
314312
}
315313
})
316314

0 commit comments

Comments
 (0)