From 126e7e7f92d2c8331430484170ec29bc30c452e1 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Sun, 7 Sep 2025 01:13:57 +0200 Subject: [PATCH 1/5] Adding opusparse and enabling opus plc --- pkg/pipeline/builder/audio.go | 68 ++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index 6d1b8140..8393076b 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -17,6 +17,7 @@ package builder import ( "fmt" "sync" + "time" "github.com/go-gst/go-gst/gst" @@ -36,6 +37,8 @@ const ( leakyQueue = true blockingQueue = false + + opusPlcMaxFrames = 4 ) type AudioBin struct { @@ -201,14 +204,25 @@ func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error { return errors.ErrGstPipelineError(err) } + opusParse, err := gst.NewElement("opusparse") + if err != nil { + return errors.ErrGstPipelineError(err) + } + opusDec, err := gst.NewElement("opusdec") if err != nil { return errors.ErrGstPipelineError(err) } - if err = appSrcBin.AddElements(rtpOpusDepay, opusDec); err != nil { + err = opusDec.SetProperty("plc", true) + if err != nil { + return errors.ErrGstPipelineError(err) + } + + if err = appSrcBin.AddElements(rtpOpusDepay, opusParse, opusDec); err != nil { return err } + installOpusParseSrcProbe(opusParse) default: return errors.ErrNotSupported(string(ts.MimeType)) @@ -397,3 +411,55 @@ func subscribeForQoS(mixer *gst.Element) { } }) } + +func installOpusParseSrcProbe(opusParse *gst.Element) { + src := opusParse.GetStaticPad("src") + + var lastPTS, lastDur time.Duration + const maxJitter = 3 * time.Millisecond // allow tiny wobble + + src.AddProbe(gst.PadProbeTypeBuffer, func(p *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { + buf := info.GetBuffer() + if buf == nil { + return gst.PadProbeOK + } + + pts := time.Duration(buf.PresentationTimestamp()) + dur := time.Duration(buf.Duration()) + + if dur <= 0 { + // Fallback if TOC wasn’t parsed (shouldn’t happen with opusparse) + if lastDur > 0 { + dur = lastDur + } else { + dur = 20 * time.Millisecond + } + } + + if lastDur > 0 { + expected := lastPTS + lastDur + if pts > expected { + gap := pts - expected + // Only trigger for at least ~one full frame gap + if gap+maxJitter >= lastDur { + // k missing frames (rounded) + k := int((gap + lastDur - 1) / lastDur) + if k < 1 { + k = 1 + } + if k <= opusPlcMaxFrames { + logger.Debugw("opusparse src probe: gap detected", "gap", gap, "lastDur", lastDur, "expected", expected, "pts", pts) + missed := time.Duration(k) * lastDur + // Push GAP so opusdec generates PLC + gapEv := gst.NewGapEvent(gst.ClockTime(expected), gst.ClockTime(missed)) + p.PushEvent(gapEv) + buf.SetFlags(buf.GetFlags() | gst.BufferFlagDiscont) + } + } + } + } + lastPTS, lastDur = pts, dur + return gst.PadProbeOK + + }) +} From 79091d14344ac4e81751b965bf28fe4198b6b1f5 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Sun, 7 Sep 2025 23:20:46 +0200 Subject: [PATCH 2/5] send opusdec stats message to the main bus for data gathering --- pkg/pipeline/builder/audio.go | 25 ++++-- pkg/pipeline/builder/audio_stats.go | 129 ++++++++++++++++++++++++++++ pkg/pipeline/watch.go | 38 ++++++++ 3 files changed, 186 insertions(+), 6 deletions(-) create mode 100644 pkg/pipeline/builder/audio_stats.go diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index 8393076b..bd0aa022 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -38,7 +38,9 @@ const ( leakyQueue = true blockingQueue = false - opusPlcMaxFrames = 4 + opusPlcMaxFrames = 4 + opusDecStatsPollInterval = time.Second * 5 + opusDecPlcMaxJitter = 3 * time.Millisecond ) type AudioBin struct { @@ -222,7 +224,7 @@ func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error { if err = appSrcBin.AddElements(rtpOpusDepay, opusParse, opusDec); err != nil { return err } - installOpusParseSrcProbe(opusParse) + installOpusParseSrcProbe(opusParse, opusDec) default: return errors.ErrNotSupported(string(ts.MimeType)) @@ -412,11 +414,11 @@ func subscribeForQoS(mixer *gst.Element) { }) } -func installOpusParseSrcProbe(opusParse *gst.Element) { +func installOpusParseSrcProbe(opusParse *gst.Element, opusDec *gst.Element) { src := opusParse.GetStaticPad("src") var lastPTS, lastDur time.Duration - const maxJitter = 3 * time.Millisecond // allow tiny wobble + var lastPoll time.Time src.AddProbe(gst.PadProbeTypeBuffer, func(p *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { buf := info.GetBuffer() @@ -441,14 +443,13 @@ func installOpusParseSrcProbe(opusParse *gst.Element) { if pts > expected { gap := pts - expected // Only trigger for at least ~one full frame gap - if gap+maxJitter >= lastDur { + if gap+opusDecPlcMaxJitter >= lastDur { // k missing frames (rounded) k := int((gap + lastDur - 1) / lastDur) if k < 1 { k = 1 } if k <= opusPlcMaxFrames { - logger.Debugw("opusparse src probe: gap detected", "gap", gap, "lastDur", lastDur, "expected", expected, "pts", pts) missed := time.Duration(k) * lastDur // Push GAP so opusdec generates PLC gapEv := gst.NewGapEvent(gst.ClockTime(expected), gst.ClockTime(missed)) @@ -459,6 +460,18 @@ func installOpusParseSrcProbe(opusParse *gst.Element) { } } lastPTS, lastDur = pts, dur + + // periodically gather stats from opusdec + if lastPoll.IsZero() || time.Since(lastPoll) >= opusDecStatsPollInterval { + stats, err := getOpusDecStats(opusDec) + if err != nil { + logger.Debugw("opusdec stats: parse error", "err", err) + return gst.PadProbeOK + } + postOpusStatsMessage(opusDec, stats) + lastPoll = time.Now() + } + return gst.PadProbeOK }) diff --git a/pkg/pipeline/builder/audio_stats.go b/pkg/pipeline/builder/audio_stats.go new file mode 100644 index 00000000..d483a1ff --- /dev/null +++ b/pkg/pipeline/builder/audio_stats.go @@ -0,0 +1,129 @@ +package builder + +import ( + "fmt" + "log" + "regexp" + "strconv" + "time" + + "github.com/go-gst/go-glib/glib" + "github.com/go-gst/go-gst/gst" + "github.com/livekit/protocol/logger" +) + +const ( + OpusStatsStructName = "livekit-opus-plc-stats" + OpusStatsKeyPlcDurationNs = "plc-duration-ns" + OpusStatsKeyPlcNumSamples = "plc-num-samples" + OpusStatsKeyNumGap = "num-gap" + OpusStatsKeyNumPushed = "num-pushed" +) + +var ( + reU64 = map[string]*regexp.Regexp{ + "num-pushed": regexp.MustCompile(`\bnum-pushed=\(g?uint64\)(\d+)`), + "num-gap": regexp.MustCompile(`\bnum-gap=\(g?uint64\)(\d+)`), + "plc-num-samples": regexp.MustCompile(`\bplc-num-samples=\(g?uint64\)(\d+)`), + "plc-duration": regexp.MustCompile(`\bplc-duration=\(g?uint64\)(\d+)`), // ns + } + reU32 = map[string]*regexp.Regexp{ + "sample-rate": regexp.MustCompile(`\bsample-rate=\(uint\)(\d+)`), + "channels": regexp.MustCompile(`\bchannels=\(uint\)(\d+)`), + } +) + +type OpusDecStats struct { + NumPushed uint64 + NumGap uint64 + PlcNumSamples uint64 + PlcDuration time.Duration // ns + SampleRate uint32 + Channels uint32 +} + +func serializeOpusStats(opusdec *gst.Element) (string, bool) { + gvAny, err := opusdec.GetProperty("stats") + if err != nil { + log.Printf("opusdec stats: get failed: %v", err) + return "", false + } + switch v := gvAny.(type) { + case *gst.Structure: + return v.String(), true + case *glib.Value: + return gst.ValueSerialize(v), true + case string: + return v, true + default: + log.Printf("opusdec stats: unexpected type %T", gvAny) + return "", false + } +} + +/*** minimal parser for the serialized GstStructure ***/ + +func parseStatsString(s string) (OpusDecStats, error) { + var st OpusDecStats + + getU64 := func(k string) (uint64, error) { + if m := reU64[k].FindStringSubmatch(s); len(m) == 2 { + return strconv.ParseUint(m[1], 10, 64) + } + return 0, fmt.Errorf("missing %s", k) + } + getU32 := func(k string) (uint32, error) { + if m := reU32[k].FindStringSubmatch(s); len(m) == 2 { + v, _ := strconv.ParseUint(m[1], 10, 32) + return uint32(v), nil + } + return 0, fmt.Errorf("missing %s", k) + } + + // Required + if v, err := getU64("num-pushed"); err == nil { + st.NumPushed = v + } + if v, err := getU64("num-gap"); err == nil { + st.NumGap = v + } + if v, err := getU64("plc-num-samples"); err == nil { + st.PlcNumSamples = v + } + if v, err := getU64("plc-duration"); err == nil { + st.PlcDuration = time.Duration(v) * time.Nanosecond + } + + // Optional + if v, err := getU32("sample-rate"); err == nil { + st.SampleRate = v + } + if v, err := getU32("channels"); err == nil { + st.Channels = v + } + return st, nil +} + +func getOpusDecStats(opusdec *gst.Element) (OpusDecStats, error) { + ser, ok := serializeOpusStats(opusdec) + if !ok { + return OpusDecStats{}, fmt.Errorf("serialize error") + } + return parseStatsString(ser) +} + +func postOpusStatsMessage(src *gst.Element, stats OpusDecStats) { + s := gst.NewStructureFromString( + fmt.Sprintf("%s, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d", + OpusStatsStructName, + OpusStatsKeyPlcDurationNs, stats.PlcDuration.Nanoseconds(), + OpusStatsKeyPlcNumSamples, stats.PlcNumSamples, + OpusStatsKeyNumGap, stats.NumGap, + OpusStatsKeyNumPushed, stats.NumPushed, + )) + msg := gst.NewElementMessage(src, s) + sent := src.PostMessage(msg) + if !sent { + logger.Debugw("failed to send opusdec PLC stats", "sent", sent) + } +} diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index f0065858..774bbd2a 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -317,6 +317,8 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { if err != nil { return err } + case builder.OpusStatsStructName: + c.handleOpusStats(s) } } @@ -340,6 +342,42 @@ func (c *Controller) handleAudioMixerQoS(qosValues *gst.QoSValues) { c.stats.droppedAudioDuration.Add(qosValues.Duration) } +func (c *Controller) handleOpusStats(s *gst.Structure) { + dur, err := s.GetValue(builder.OpusStatsKeyPlcDurationNs) + if err != nil { + return + } + plcDurationNs, ok := dur.(uint64) + if !ok { + return + } + numSamples, err := s.GetValue(builder.OpusStatsKeyPlcNumSamples) + if err != nil { + return + } + plcNumSamples, ok := numSamples.(uint64) + if !ok { + return + } + numGap, err := s.GetValue(builder.OpusStatsKeyNumGap) + if err != nil { + return + } + plcNumGap, ok := numGap.(uint64) + if !ok { + return + } + pushed, err := s.GetValue(builder.OpusStatsKeyNumPushed) + if err != nil { + return + } + plcNumPushed, ok := pushed.(uint64) + if !ok { + return + } + logger.Debugw("opusdec PLC stats", "plcDurationNs", plcDurationNs, "plcNumSamples", plcNumSamples, "numGap", plcNumGap, "numPushed", plcNumPushed) +} + // Debug info comes in the following format: // file.c(line): method_name (): /GstPipeline:pipeline/GstBin:bin_name/GstElement:element_name:\nError message var gstDebug = regexp.MustCompile("(?s)(.*?)GstPipeline:pipeline/GstBin:(.*?)/(.*?):([^:]*)(:\n)?(.*)") From 2422affa8525c5824d0eb954340c5562e5f8cb5f Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Mon, 8 Sep 2025 10:41:41 +0200 Subject: [PATCH 3/5] Updating audio qos with opusdec metrics --- pkg/pipeline/builder/audio.go | 4 ++-- pkg/pipeline/builder/audio_stats.go | 24 ++++++++++++------------ pkg/pipeline/controller.go | 9 +++++++++ pkg/pipeline/watch.go | 20 ++++++++++++-------- 4 files changed, 35 insertions(+), 22 deletions(-) diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index bd0aa022..05f43c09 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -38,7 +38,7 @@ const ( leakyQueue = true blockingQueue = false - opusPlcMaxFrames = 4 + opusPlcMaxFrames = 5 opusDecStatsPollInterval = time.Second * 5 opusDecPlcMaxJitter = 3 * time.Millisecond ) @@ -468,7 +468,7 @@ func installOpusParseSrcProbe(opusParse *gst.Element, opusDec *gst.Element) { logger.Debugw("opusdec stats: parse error", "err", err) return gst.PadProbeOK } - postOpusStatsMessage(opusDec, stats) + postOpusDecStatsMessage(opusDec, stats) lastPoll = time.Now() } diff --git a/pkg/pipeline/builder/audio_stats.go b/pkg/pipeline/builder/audio_stats.go index d483a1ff..0bc6ea59 100644 --- a/pkg/pipeline/builder/audio_stats.go +++ b/pkg/pipeline/builder/audio_stats.go @@ -13,11 +13,11 @@ import ( ) const ( - OpusStatsStructName = "livekit-opus-plc-stats" - OpusStatsKeyPlcDurationNs = "plc-duration-ns" - OpusStatsKeyPlcNumSamples = "plc-num-samples" - OpusStatsKeyNumGap = "num-gap" - OpusStatsKeyNumPushed = "num-pushed" + OpusDecStatsStructName = "livekit-opus-plc-stats" + OpusDecStatsKeyPlcDurationNs = "plc-duration-ns" + OpusDecStatsKeyPlcNumSamples = "plc-num-samples" + OpusDecStatsKeyNumGap = "num-gap" + OpusDecStatsKeyNumPushed = "num-pushed" ) var ( @@ -112,18 +112,18 @@ func getOpusDecStats(opusdec *gst.Element) (OpusDecStats, error) { return parseStatsString(ser) } -func postOpusStatsMessage(src *gst.Element, stats OpusDecStats) { +func postOpusDecStatsMessage(src *gst.Element, stats OpusDecStats) { s := gst.NewStructureFromString( fmt.Sprintf("%s, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d", - OpusStatsStructName, - OpusStatsKeyPlcDurationNs, stats.PlcDuration.Nanoseconds(), - OpusStatsKeyPlcNumSamples, stats.PlcNumSamples, - OpusStatsKeyNumGap, stats.NumGap, - OpusStatsKeyNumPushed, stats.NumPushed, + OpusDecStatsStructName, + OpusDecStatsKeyPlcDurationNs, stats.PlcDuration.Nanoseconds(), + OpusDecStatsKeyPlcNumSamples, stats.PlcNumSamples, + OpusDecStatsKeyNumGap, stats.NumGap, + OpusDecStatsKeyNumPushed, stats.NumPushed, )) msg := gst.NewElementMessage(src, s) sent := src.PostMessage(msg) if !sent { - logger.Debugw("failed to send opusdec PLC stats", "sent", sent) + logger.Debugw("failed to send opusdec PLC stats") } } diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 2c6443f8..06205998 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -73,6 +73,11 @@ type Controller struct { type controllerStats struct { droppedAudioBuffers atomic.Uint64 droppedAudioDuration atomic.Duration + // opusdec stats + opusDecPLCDuration atomic.Duration + opusDecPLCSamples atomic.Uint64 + opusDecPacketsPushed atomic.Uint64 + opusDecGapPackets atomic.Uint64 } func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.EgressServiceClient) (*Controller, error) { @@ -177,6 +182,10 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { logger.Debugw("Audio QoS stats", "audio buffers dropped", c.stats.droppedAudioBuffers.Load(), "total audio duration dropped", c.stats.droppedAudioDuration.Load(), + "opusdec PLC duration", c.stats.opusDecPLCDuration.Load(), + "opusdec PLC samples", c.stats.opusDecPLCSamples.Load(), + "opusdec gap packets", c.stats.opusDecGapPackets.Load(), + "opusdec packets pushed", c.stats.opusDecPacketsPushed.Load(), ) }() diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 774bbd2a..a39e70d8 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -317,8 +317,8 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { if err != nil { return err } - case builder.OpusStatsStructName: - c.handleOpusStats(s) + case builder.OpusDecStatsStructName: + c.handleOpusDecStats(s) } } @@ -342,8 +342,8 @@ func (c *Controller) handleAudioMixerQoS(qosValues *gst.QoSValues) { c.stats.droppedAudioDuration.Add(qosValues.Duration) } -func (c *Controller) handleOpusStats(s *gst.Structure) { - dur, err := s.GetValue(builder.OpusStatsKeyPlcDurationNs) +func (c *Controller) handleOpusDecStats(s *gst.Structure) { + dur, err := s.GetValue(builder.OpusDecStatsKeyPlcDurationNs) if err != nil { return } @@ -351,7 +351,7 @@ func (c *Controller) handleOpusStats(s *gst.Structure) { if !ok { return } - numSamples, err := s.GetValue(builder.OpusStatsKeyPlcNumSamples) + numSamples, err := s.GetValue(builder.OpusDecStatsKeyPlcNumSamples) if err != nil { return } @@ -359,7 +359,7 @@ func (c *Controller) handleOpusStats(s *gst.Structure) { if !ok { return } - numGap, err := s.GetValue(builder.OpusStatsKeyNumGap) + numGap, err := s.GetValue(builder.OpusDecStatsKeyNumGap) if err != nil { return } @@ -367,7 +367,7 @@ func (c *Controller) handleOpusStats(s *gst.Structure) { if !ok { return } - pushed, err := s.GetValue(builder.OpusStatsKeyNumPushed) + pushed, err := s.GetValue(builder.OpusDecStatsKeyNumPushed) if err != nil { return } @@ -375,7 +375,11 @@ func (c *Controller) handleOpusStats(s *gst.Structure) { if !ok { return } - logger.Debugw("opusdec PLC stats", "plcDurationNs", plcDurationNs, "plcNumSamples", plcNumSamples, "numGap", plcNumGap, "numPushed", plcNumPushed) + + c.stats.opusDecPLCDuration.Store(time.Duration(plcDurationNs) * time.Nanosecond) + c.stats.opusDecPLCSamples.Store(plcNumSamples) + c.stats.opusDecGapPackets.Store(plcNumGap) + c.stats.opusDecPacketsPushed.Store(plcNumPushed) } // Debug info comes in the following format: From 9bc3c1fb94274ef610ad136f210f08903753b789 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Mon, 8 Sep 2025 11:43:16 +0200 Subject: [PATCH 4/5] Making default opus duration a const and adding bounds check for uint64->int64 conversion --- pkg/pipeline/builder/audio.go | 9 +++++---- pkg/pipeline/builder/audio_stats.go | 5 ++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index 05f43c09..8b380c17 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -38,9 +38,10 @@ const ( leakyQueue = true blockingQueue = false - opusPlcMaxFrames = 5 - opusDecStatsPollInterval = time.Second * 5 - opusDecPlcMaxJitter = 3 * time.Millisecond + opusPlcMaxFrames = 5 + opusDecStatsPollInterval = time.Second * 5 + opusDecPlcMaxJitter = 3 * time.Millisecond + defaultOpusPacketDuration = time.Millisecond * 20 ) type AudioBin struct { @@ -434,7 +435,7 @@ func installOpusParseSrcProbe(opusParse *gst.Element, opusDec *gst.Element) { if lastDur > 0 { dur = lastDur } else { - dur = 20 * time.Millisecond + dur = defaultOpusPacketDuration } } diff --git a/pkg/pipeline/builder/audio_stats.go b/pkg/pipeline/builder/audio_stats.go index 0bc6ea59..7abb7e50 100644 --- a/pkg/pipeline/builder/audio_stats.go +++ b/pkg/pipeline/builder/audio_stats.go @@ -3,6 +3,7 @@ package builder import ( "fmt" "log" + "math" "regexp" "strconv" "time" @@ -91,7 +92,9 @@ func parseStatsString(s string) (OpusDecStats, error) { st.PlcNumSamples = v } if v, err := getU64("plc-duration"); err == nil { - st.PlcDuration = time.Duration(v) * time.Nanosecond + if v <= math.MaxInt64 { + st.PlcDuration = time.Duration(v) * time.Nanosecond + } } // Optional From 20857051a3cb68a0bbee94ea1eb56a4222f56735 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Mon, 8 Sep 2025 14:24:10 +0200 Subject: [PATCH 5/5] Use regex key value scanner instead of in place functions --- go.mod | 2 +- go.sum | 4 +-- pkg/pipeline/builder/audio_stats.go | 56 +++++++++-------------------- 3 files changed, 20 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index e68f9100..f23e6acc 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/livekit/livekit-server v1.8.4 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 - github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5 + github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 github.com/livekit/server-sdk-go/v2 v2.11.2 github.com/livekit/storage v0.0.0-20250711185412-0dabf9984ad7 diff --git a/go.sum b/go.sum index cadd43d8..e758f029 100644 --- a/go.sum +++ b/go.sum @@ -247,8 +247,8 @@ github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 h1:aFCwt/rticj5L github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8= github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g= github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5 h1:aBqHlrgCI3qzVUAoOx7n5Kt8GQ8g7UbNp69fUt2gO8I= -github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU= +github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac h1:WT7njMPTobUjfJEQm9JbiU3/5vpIpG6XOfJFmVPJJvU= +github.com/livekit/protocol v1.41.1-0.20250908120808-3db07988acac/go.mod h1:Scx8arfj5y65w6EYA3ZIKJafoN2xBuV8pauvyrvI4eg= github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 h1:KKL1u94l6dF9u4cBwnnfozk27GH1txWy2SlvkfgmzoY= github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/livekit/server-sdk-go/v2 v2.11.2 h1:Nnkf6nlVweHqywx8SxolInrEHjdM5HKtiWBDhrbrXi4= diff --git a/pkg/pipeline/builder/audio_stats.go b/pkg/pipeline/builder/audio_stats.go index 7abb7e50..e00fd235 100644 --- a/pkg/pipeline/builder/audio_stats.go +++ b/pkg/pipeline/builder/audio_stats.go @@ -3,14 +3,13 @@ package builder import ( "fmt" "log" - "math" "regexp" - "strconv" "time" "github.com/go-gst/go-glib/glib" "github.com/go-gst/go-gst/gst" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) const ( @@ -22,16 +21,12 @@ const ( ) var ( - reU64 = map[string]*regexp.Regexp{ - "num-pushed": regexp.MustCompile(`\bnum-pushed=\(g?uint64\)(\d+)`), - "num-gap": regexp.MustCompile(`\bnum-gap=\(g?uint64\)(\d+)`), - "plc-num-samples": regexp.MustCompile(`\bplc-num-samples=\(g?uint64\)(\d+)`), - "plc-duration": regexp.MustCompile(`\bplc-duration=\(g?uint64\)(\d+)`), // ns - } - reU32 = map[string]*regexp.Regexp{ - "sample-rate": regexp.MustCompile(`\bsample-rate=\(uint\)(\d+)`), - "channels": regexp.MustCompile(`\bchannels=\(uint\)(\d+)`), - } + rePushed = regexp.MustCompile(`\bnum-pushed=\(g?uint64\)(\d+)`) + reGap = regexp.MustCompile(`\bnum-gap=\(g?uint64\)(\d+)`) + rePlcNumSamples = regexp.MustCompile(`\bplc-num-samples=\(g?uint64\)(\d+)`) + rePlcDuration = regexp.MustCompile(`\bplc-duration=\(g?uint64\)(\d+)`) + reSampleRate = regexp.MustCompile(`\bsample-rate=\(uint\)(\d+)`) + reChannels = regexp.MustCompile(`\bchannels=\(uint\)(\d+)`) ) type OpusDecStats struct { @@ -39,8 +34,8 @@ type OpusDecStats struct { NumGap uint64 PlcNumSamples uint64 PlcDuration time.Duration // ns - SampleRate uint32 - Channels uint32 + SampleRate uint64 + Channels uint64 } func serializeOpusStats(opusdec *gst.Element) (string, bool) { @@ -67,41 +62,24 @@ func serializeOpusStats(opusdec *gst.Element) (string, bool) { func parseStatsString(s string) (OpusDecStats, error) { var st OpusDecStats - getU64 := func(k string) (uint64, error) { - if m := reU64[k].FindStringSubmatch(s); len(m) == 2 { - return strconv.ParseUint(m[1], 10, 64) - } - return 0, fmt.Errorf("missing %s", k) - } - getU32 := func(k string) (uint32, error) { - if m := reU32[k].FindStringSubmatch(s); len(m) == 2 { - v, _ := strconv.ParseUint(m[1], 10, 32) - return uint32(v), nil - } - return 0, fmt.Errorf("missing %s", k) - } + kv := utils.NewKVRegexScanner(s) - // Required - if v, err := getU64("num-pushed"); err == nil { + if v, ok := kv.Uint64(rePushed); ok { st.NumPushed = v } - if v, err := getU64("num-gap"); err == nil { + if v, ok := kv.Uint64(reGap); ok { st.NumGap = v } - if v, err := getU64("plc-num-samples"); err == nil { + if v, ok := kv.Uint64(rePlcNumSamples); ok { st.PlcNumSamples = v } - if v, err := getU64("plc-duration"); err == nil { - if v <= math.MaxInt64 { - st.PlcDuration = time.Duration(v) * time.Nanosecond - } + if v, ok := kv.DurationNs(rePlcDuration); ok { + st.PlcDuration = v } - - // Optional - if v, err := getU32("sample-rate"); err == nil { + if v, ok := kv.Uint64(reSampleRate); ok { st.SampleRate = v } - if v, err := getU32("channels"); err == nil { + if v, ok := kv.Uint64(reChannels); ok { st.Channels = v } return st, nil