diff --git a/go.mod b/go.mod index e68f9100..f23e6acc 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/livekit/livekit-server v1.8.4 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 - github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5 + github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 github.com/livekit/server-sdk-go/v2 v2.11.2 github.com/livekit/storage v0.0.0-20250711185412-0dabf9984ad7 diff --git a/go.sum b/go.sum index cadd43d8..e758f029 100644 --- a/go.sum +++ b/go.sum @@ -247,8 +247,8 @@ github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 h1:aFCwt/rticj5L github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8= github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g= github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5 h1:aBqHlrgCI3qzVUAoOx7n5Kt8GQ8g7UbNp69fUt2gO8I= -github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU= +github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac h1:WT7njMPTobUjfJEQm9JbiU3/5vpIpG6XOfJFmVPJJvU= +github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac/go.mod h1:Scx8arfj5y65w6EYA3ZIKJafoN2xBuV8pauvyrvI4eg= github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 h1:KKL1u94l6dF9u4cBwnnfozk27GH1txWy2SlvkfgmzoY= github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/livekit/server-sdk-go/v2 v2.11.2 h1:Nnkf6nlVweHqywx8SxolInrEHjdM5HKtiWBDhrbrXi4= diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index 6d1b8140..8b380c17 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -17,6 +17,7 @@ package builder import ( "fmt" "sync" + "time" "github.com/go-gst/go-gst/gst" @@ -36,6 +37,11 @@ const ( leakyQueue = true blockingQueue = false + + opusPlcMaxFrames = 5 + opusDecStatsPollInterval = time.Second * 5 + opusDecPlcMaxJitter = 3 * time.Millisecond + defaultOpusPacketDuration = time.Millisecond * 20 ) type AudioBin struct { @@ -201,14 +207,25 @@ func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error { return errors.ErrGstPipelineError(err) } + opusParse, err := gst.NewElement("opusparse") + if err != nil { + return errors.ErrGstPipelineError(err) + } + opusDec, err := gst.NewElement("opusdec") if err != nil { return errors.ErrGstPipelineError(err) } - if err = appSrcBin.AddElements(rtpOpusDepay, opusDec); err != nil { + err = opusDec.SetProperty("plc", true) + if err != nil { + return errors.ErrGstPipelineError(err) + } + + if err = appSrcBin.AddElements(rtpOpusDepay, opusParse, opusDec); err != nil { return err } + installOpusParseSrcProbe(opusParse, opusDec) default: return errors.ErrNotSupported(string(ts.MimeType)) @@ -397,3 +414,66 @@ func subscribeForQoS(mixer *gst.Element) { } }) } + +func installOpusParseSrcProbe(opusParse *gst.Element, opusDec *gst.Element) { + src := opusParse.GetStaticPad("src") + + var lastPTS, lastDur time.Duration + var lastPoll time.Time + + src.AddProbe(gst.PadProbeTypeBuffer, func(p *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { + buf := info.GetBuffer() + if buf == nil { + return gst.PadProbeOK + } + + pts := time.Duration(buf.PresentationTimestamp()) + dur := time.Duration(buf.Duration()) + + if dur <= 0 { + // Fallback if TOC wasn’t parsed (shouldn’t happen with opusparse) + if lastDur > 0 { + dur = lastDur + } else { + dur = defaultOpusPacketDuration + } + } + + if lastDur > 0 { + expected := lastPTS + lastDur + if pts > expected { + gap := pts - expected + // Only trigger for at least ~one full frame gap + if gap+opusDecPlcMaxJitter >= lastDur { + // k missing frames (rounded) + k := int((gap + lastDur - 1) / lastDur) + if k < 1 { + k = 1 + } + if k <= opusPlcMaxFrames { + missed := time.Duration(k) * lastDur + // Push GAP so opusdec generates PLC + gapEv := gst.NewGapEvent(gst.ClockTime(expected), gst.ClockTime(missed)) + p.PushEvent(gapEv) + buf.SetFlags(buf.GetFlags() | gst.BufferFlagDiscont) + } + } + } + } + lastPTS, lastDur = pts, dur + + // periodically gather stats from opusdec + if lastPoll.IsZero() || time.Since(lastPoll) >= opusDecStatsPollInterval { + stats, err := getOpusDecStats(opusDec) + if err != nil { + logger.Debugw("opusdec stats: parse error", "err", err) + return gst.PadProbeOK + } + postOpusDecStatsMessage(opusDec, stats) + lastPoll = time.Now() + } + + return gst.PadProbeOK + + }) +} diff --git a/pkg/pipeline/builder/audio_stats.go b/pkg/pipeline/builder/audio_stats.go new file mode 100644 index 00000000..e00fd235 --- /dev/null +++ b/pkg/pipeline/builder/audio_stats.go @@ -0,0 +1,110 @@ +package builder + +import ( + "fmt" + "log" + "regexp" + "time" + + "github.com/go-gst/go-glib/glib" + "github.com/go-gst/go-gst/gst" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" +) + +const ( + OpusDecStatsStructName = "livekit-opus-plc-stats" + OpusDecStatsKeyPlcDurationNs = "plc-duration-ns" + OpusDecStatsKeyPlcNumSamples = "plc-num-samples" + OpusDecStatsKeyNumGap = "num-gap" + OpusDecStatsKeyNumPushed = "num-pushed" +) + +var ( + rePushed = regexp.MustCompile(`\bnum-pushed=\(g?uint64\)(\d+)`) + reGap = regexp.MustCompile(`\bnum-gap=\(g?uint64\)(\d+)`) + rePlcNumSamples = regexp.MustCompile(`\bplc-num-samples=\(g?uint64\)(\d+)`) + rePlcDuration = regexp.MustCompile(`\bplc-duration=\(g?uint64\)(\d+)`) + reSampleRate = regexp.MustCompile(`\bsample-rate=\(uint\)(\d+)`) + reChannels = regexp.MustCompile(`\bchannels=\(uint\)(\d+)`) +) + +type OpusDecStats struct { + NumPushed uint64 + NumGap uint64 + PlcNumSamples uint64 + PlcDuration time.Duration // ns + SampleRate uint64 + Channels uint64 +} + +func serializeOpusStats(opusdec *gst.Element) (string, bool) { + gvAny, err := opusdec.GetProperty("stats") + if err != nil { + log.Printf("opusdec stats: get failed: %v", err) + return "", false + } + switch v := gvAny.(type) { + case *gst.Structure: + return v.String(), true + case *glib.Value: + return gst.ValueSerialize(v), true + case string: + return v, true + default: + log.Printf("opusdec stats: unexpected type %T", gvAny) + return "", false + } +} + +/*** minimal parser for the serialized GstStructure ***/ + +func parseStatsString(s string) (OpusDecStats, error) { + var st OpusDecStats + + kv := utils.NewKVRegexScanner(s) + + if v, ok := kv.Uint64(rePushed); ok { + st.NumPushed = v + } + if v, ok := kv.Uint64(reGap); ok { + st.NumGap = v + } + if v, ok := kv.Uint64(rePlcNumSamples); ok { + st.PlcNumSamples = v + } + if v, ok := kv.DurationNs(rePlcDuration); ok { + st.PlcDuration = v + } + if v, ok := kv.Uint64(reSampleRate); ok { + st.SampleRate = v + } + if v, ok := kv.Uint64(reChannels); ok { + st.Channels = v + } + return st, nil +} + +func getOpusDecStats(opusdec *gst.Element) (OpusDecStats, error) { + ser, ok := serializeOpusStats(opusdec) + if !ok { + return OpusDecStats{}, fmt.Errorf("serialize error") + } + return parseStatsString(ser) +} + +func postOpusDecStatsMessage(src *gst.Element, stats OpusDecStats) { + s := gst.NewStructureFromString( + fmt.Sprintf("%s, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d", + OpusDecStatsStructName, + OpusDecStatsKeyPlcDurationNs, stats.PlcDuration.Nanoseconds(), + OpusDecStatsKeyPlcNumSamples, stats.PlcNumSamples, + OpusDecStatsKeyNumGap, stats.NumGap, + OpusDecStatsKeyNumPushed, stats.NumPushed, + )) + msg := gst.NewElementMessage(src, s) + sent := src.PostMessage(msg) + if !sent { + logger.Debugw("failed to send opusdec PLC stats") + } +} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 2c6443f8..06205998 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -73,6 +73,11 @@ type Controller struct { type controllerStats struct { droppedAudioBuffers atomic.Uint64 droppedAudioDuration atomic.Duration + // opusdec stats + opusDecPLCDuration atomic.Duration + opusDecPLCSamples atomic.Uint64 + opusDecPacketsPushed atomic.Uint64 + opusDecGapPackets atomic.Uint64 } func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.EgressServiceClient) (*Controller, error) { @@ -177,6 +182,10 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { logger.Debugw("Audio QoS stats", "audio buffers dropped", c.stats.droppedAudioBuffers.Load(), "total audio duration dropped", c.stats.droppedAudioDuration.Load(), + "opusdec PLC duration", c.stats.opusDecPLCDuration.Load(), + "opusdec PLC samples", c.stats.opusDecPLCSamples.Load(), + "opusdec gap packets", c.stats.opusDecGapPackets.Load(), + "opusdec packets pushed", c.stats.opusDecPacketsPushed.Load(), ) }() diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index f0065858..a39e70d8 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -317,6 +317,8 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { if err != nil { return err } + case builder.OpusDecStatsStructName: + c.handleOpusDecStats(s) } } @@ -340,6 +342,46 @@ func (c *Controller) handleAudioMixerQoS(qosValues *gst.QoSValues) { c.stats.droppedAudioDuration.Add(qosValues.Duration) } +func (c *Controller) handleOpusDecStats(s *gst.Structure) { + dur, err := s.GetValue(builder.OpusDecStatsKeyPlcDurationNs) + if err != nil { + return + } + plcDurationNs, ok := dur.(uint64) + if !ok { + return + } + numSamples, err := s.GetValue(builder.OpusDecStatsKeyPlcNumSamples) + if err != nil { + return + } + plcNumSamples, ok := numSamples.(uint64) + if !ok { + return + } + numGap, err := s.GetValue(builder.OpusDecStatsKeyNumGap) + if err != nil { + return + } + plcNumGap, ok := numGap.(uint64) + if !ok { + return + } + pushed, err := s.GetValue(builder.OpusDecStatsKeyNumPushed) + if err != nil { + return + } + plcNumPushed, ok := pushed.(uint64) + if !ok { + return + } + + c.stats.opusDecPLCDuration.Store(time.Duration(plcDurationNs) * time.Nanosecond) + c.stats.opusDecPLCSamples.Store(plcNumSamples) + c.stats.opusDecGapPackets.Store(plcNumGap) + c.stats.opusDecPacketsPushed.Store(plcNumPushed) +} + // Debug info comes in the following format: // file.c(line): method_name (): /GstPipeline:pipeline/GstBin:bin_name/GstElement:element_name:\nError message var gstDebug = regexp.MustCompile("(?s)(.*?)GstPipeline:pipeline/GstBin:(.*?)/(.*?):([^:]*)(:\n)?(.*)")