Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/livekit/livekit-server v1.8.4
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5
github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5
github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741
github.com/livekit/server-sdk-go/v2 v2.11.2
github.com/livekit/storage v0.0.0-20250711185412-0dabf9984ad7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 h1:aFCwt/rticj5L
github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5 h1:aBqHlrgCI3qzVUAoOx7n5Kt8GQ8g7UbNp69fUt2gO8I=
github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU=
github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac h1:WT7njMPTobUjfJEQm9JbiU3/5vpIpG6XOfJFmVPJJvU=
github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac/go.mod h1:Scx8arfj5y65w6EYA3ZIKJafoN2xBuV8pauvyrvI4eg=
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 h1:KKL1u94l6dF9u4cBwnnfozk27GH1txWy2SlvkfgmzoY=
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
github.com/livekit/server-sdk-go/v2 v2.11.2 h1:Nnkf6nlVweHqywx8SxolInrEHjdM5HKtiWBDhrbrXi4=
Expand Down
82 changes: 81 additions & 1 deletion pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package builder
import (
"fmt"
"sync"
"time"

"github.com/go-gst/go-gst/gst"

Expand All @@ -36,6 +37,11 @@ const (

leakyQueue = true
blockingQueue = false

opusPlcMaxFrames = 5
opusDecStatsPollInterval = time.Second * 5
opusDecPlcMaxJitter = 3 * time.Millisecond
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 3ms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

semi arbitrary chosen value :)
The case I had in mind was - if the gap is almost one frame (e.g., 18–19 ms when the true frame is 20 ms) still count it as a loss. That probably won't happen as long as we lose full packets. Added for the cases when wall-clock PTS adjustment kicks in (even then it's going to be extremely rare).

defaultOpusPacketDuration = time.Millisecond * 20
)

type AudioBin struct {
Expand Down Expand Up @@ -201,14 +207,25 @@ func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error {
return errors.ErrGstPipelineError(err)
}

opusParse, err := gst.NewElement("opusparse")
if err != nil {
return errors.ErrGstPipelineError(err)
}

opusDec, err := gst.NewElement("opusdec")
if err != nil {
return errors.ErrGstPipelineError(err)
}

if err = appSrcBin.AddElements(rtpOpusDepay, opusDec); err != nil {
err = opusDec.SetProperty("plc", true)
if err != nil {
return errors.ErrGstPipelineError(err)
}

if err = appSrcBin.AddElements(rtpOpusDepay, opusParse, opusDec); err != nil {
return err
}
installOpusParseSrcProbe(opusParse, opusDec)

default:
return errors.ErrNotSupported(string(ts.MimeType))
Expand Down Expand Up @@ -397,3 +414,66 @@ func subscribeForQoS(mixer *gst.Element) {
}
})
}

func installOpusParseSrcProbe(opusParse *gst.Element, opusDec *gst.Element) {
src := opusParse.GetStaticPad("src")

var lastPTS, lastDur time.Duration
var lastPoll time.Time

src.AddProbe(gst.PadProbeTypeBuffer, func(p *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buf := info.GetBuffer()
if buf == nil {
return gst.PadProbeOK
}

pts := time.Duration(buf.PresentationTimestamp())
dur := time.Duration(buf.Duration())

if dur <= 0 {
// Fallback if TOC wasn’t parsed (shouldn’t happen with opusparse)
if lastDur > 0 {
dur = lastDur
} else {
dur = defaultOpusPacketDuration
}
}

if lastDur > 0 {
expected := lastPTS + lastDur
if pts > expected {
gap := pts - expected
// Only trigger for at least ~one full frame gap
if gap+opusDecPlcMaxJitter >= lastDur {
// k missing frames (rounded)
k := int((gap + lastDur - 1) / lastDur)
if k < 1 {
k = 1
}
if k <= opusPlcMaxFrames {
missed := time.Duration(k) * lastDur
// Push GAP so opusdec generates PLC
gapEv := gst.NewGapEvent(gst.ClockTime(expected), gst.ClockTime(missed))
p.PushEvent(gapEv)
buf.SetFlags(buf.GetFlags() | gst.BufferFlagDiscont)
}
}
}
}
lastPTS, lastDur = pts, dur

// periodically gather stats from opusdec
if lastPoll.IsZero() || time.Since(lastPoll) >= opusDecStatsPollInterval {
stats, err := getOpusDecStats(opusDec)
if err != nil {
logger.Debugw("opusdec stats: parse error", "err", err)
return gst.PadProbeOK
}
postOpusDecStatsMessage(opusDec, stats)
lastPoll = time.Now()
}

return gst.PadProbeOK

})
}
110 changes: 110 additions & 0 deletions pkg/pipeline/builder/audio_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package builder

import (
"fmt"
"log"
"regexp"
"time"

"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)

const (
OpusDecStatsStructName = "livekit-opus-plc-stats"
OpusDecStatsKeyPlcDurationNs = "plc-duration-ns"
OpusDecStatsKeyPlcNumSamples = "plc-num-samples"
OpusDecStatsKeyNumGap = "num-gap"
OpusDecStatsKeyNumPushed = "num-pushed"
)

var (
rePushed = regexp.MustCompile(`\bnum-pushed=\(g?uint64\)(\d+)`)
reGap = regexp.MustCompile(`\bnum-gap=\(g?uint64\)(\d+)`)
rePlcNumSamples = regexp.MustCompile(`\bplc-num-samples=\(g?uint64\)(\d+)`)
rePlcDuration = regexp.MustCompile(`\bplc-duration=\(g?uint64\)(\d+)`)
reSampleRate = regexp.MustCompile(`\bsample-rate=\(uint\)(\d+)`)
reChannels = regexp.MustCompile(`\bchannels=\(uint\)(\d+)`)
)

type OpusDecStats struct {
NumPushed uint64
NumGap uint64
PlcNumSamples uint64
PlcDuration time.Duration // ns
SampleRate uint64
Channels uint64
}

func serializeOpusStats(opusdec *gst.Element) (string, bool) {
gvAny, err := opusdec.GetProperty("stats")
if err != nil {
log.Printf("opusdec stats: get failed: %v", err)
return "", false
}
switch v := gvAny.(type) {
case *gst.Structure:
return v.String(), true
case *glib.Value:
return gst.ValueSerialize(v), true
case string:
return v, true
default:
log.Printf("opusdec stats: unexpected type %T", gvAny)
return "", false
}
}

/*** minimal parser for the serialized GstStructure ***/

func parseStatsString(s string) (OpusDecStats, error) {
var st OpusDecStats

kv := utils.NewKVRegexScanner(s)

if v, ok := kv.Uint64(rePushed); ok {
st.NumPushed = v
}
if v, ok := kv.Uint64(reGap); ok {
st.NumGap = v
}
if v, ok := kv.Uint64(rePlcNumSamples); ok {
st.PlcNumSamples = v
}
if v, ok := kv.DurationNs(rePlcDuration); ok {
st.PlcDuration = v
}
if v, ok := kv.Uint64(reSampleRate); ok {
st.SampleRate = v
}
if v, ok := kv.Uint64(reChannels); ok {
st.Channels = v
}
return st, nil
}

func getOpusDecStats(opusdec *gst.Element) (OpusDecStats, error) {
ser, ok := serializeOpusStats(opusdec)
if !ok {
return OpusDecStats{}, fmt.Errorf("serialize error")
}
return parseStatsString(ser)
}

func postOpusDecStatsMessage(src *gst.Element, stats OpusDecStats) {
s := gst.NewStructureFromString(
fmt.Sprintf("%s, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d",
OpusDecStatsStructName,
OpusDecStatsKeyPlcDurationNs, stats.PlcDuration.Nanoseconds(),
OpusDecStatsKeyPlcNumSamples, stats.PlcNumSamples,
OpusDecStatsKeyNumGap, stats.NumGap,
OpusDecStatsKeyNumPushed, stats.NumPushed,
))
msg := gst.NewElementMessage(src, s)
sent := src.PostMessage(msg)
if !sent {
logger.Debugw("failed to send opusdec PLC stats")
}
}
9 changes: 9 additions & 0 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ type Controller struct {
type controllerStats struct {
droppedAudioBuffers atomic.Uint64
droppedAudioDuration atomic.Duration
// opusdec stats
opusDecPLCDuration atomic.Duration
opusDecPLCSamples atomic.Uint64
opusDecPacketsPushed atomic.Uint64
opusDecGapPackets atomic.Uint64
}

func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.EgressServiceClient) (*Controller, error) {
Expand Down Expand Up @@ -177,6 +182,10 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
logger.Debugw("Audio QoS stats",
"audio buffers dropped", c.stats.droppedAudioBuffers.Load(),
"total audio duration dropped", c.stats.droppedAudioDuration.Load(),
"opusdec PLC duration", c.stats.opusDecPLCDuration.Load(),
"opusdec PLC samples", c.stats.opusDecPLCSamples.Load(),
"opusdec gap packets", c.stats.opusDecGapPackets.Load(),
"opusdec packets pushed", c.stats.opusDecPacketsPushed.Load(),
)
}()

Expand Down
42 changes: 42 additions & 0 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
if err != nil {
return err
}
case builder.OpusDecStatsStructName:
c.handleOpusDecStats(s)
}
}

Expand All @@ -340,6 +342,46 @@ func (c *Controller) handleAudioMixerQoS(qosValues *gst.QoSValues) {
c.stats.droppedAudioDuration.Add(qosValues.Duration)
}

func (c *Controller) handleOpusDecStats(s *gst.Structure) {
dur, err := s.GetValue(builder.OpusDecStatsKeyPlcDurationNs)
if err != nil {
return
}
plcDurationNs, ok := dur.(uint64)
if !ok {
return
}
numSamples, err := s.GetValue(builder.OpusDecStatsKeyPlcNumSamples)
if err != nil {
return
}
plcNumSamples, ok := numSamples.(uint64)
if !ok {
return
}
numGap, err := s.GetValue(builder.OpusDecStatsKeyNumGap)
if err != nil {
return
}
plcNumGap, ok := numGap.(uint64)
if !ok {
return
}
pushed, err := s.GetValue(builder.OpusDecStatsKeyNumPushed)
if err != nil {
return
}
plcNumPushed, ok := pushed.(uint64)
if !ok {
return
}

c.stats.opusDecPLCDuration.Store(time.Duration(plcDurationNs) * time.Nanosecond)
c.stats.opusDecPLCSamples.Store(plcNumSamples)
c.stats.opusDecGapPackets.Store(plcNumGap)
c.stats.opusDecPacketsPushed.Store(plcNumPushed)
}

// Debug info comes in the following format:
// file.c(line): method_name (): /GstPipeline:pipeline/GstBin:bin_name/GstElement:element_name:\nError message
var gstDebug = regexp.MustCompile("(?s)(.*?)GstPipeline:pipeline/GstBin:(.*?)/(.*?):([^:]*)(:\n)?(.*)")
Expand Down