diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c595dc6fad..41cd4c2441f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - Improved PII Scrubbing for attributes / logs ([#5061](https://github.com/getsentry/relay/pull/5061))) - Introduces a project scope sampling rule type. ([#5077](https://github.com/getsentry/relay/pull/5077))) - Produce transactions on `transactions` Kafka topic, even if they have attachments. ([#5081](https://github.com/getsentry/relay/pull/5081)) +- Removes metric stats from the codebase. ([#5097](https://github.com/getsentry/relay/pull/5097)) - Add option gating Snuba publishing to ingest-replay-events for Replays. ([#5088](https://github.com/getsentry/relay/pull/5088)) ## 25.8.0 diff --git a/relay-base-schema/src/metrics/mri.rs b/relay-base-schema/src/metrics/mri.rs index c7a2a14d51b..c8f67d6f54e 100644 --- a/relay-base-schema/src/metrics/mri.rs +++ b/relay-base-schema/src/metrics/mri.rs @@ -114,10 +114,6 @@ pub enum MetricNamespace { Spans, /// User-defined metrics directly sent by SDKs and applications. Custom, - /// Metric stats. - /// - /// Metrics about metrics. - Stats, /// An unknown and unsupported metric. /// /// Metrics that Relay either doesn't know or recognize the namespace of will be dropped before @@ -132,22 +128,16 @@ pub enum MetricNamespace { impl MetricNamespace { /// Returns all namespaces/variants of this enum. - pub fn all() -> [Self; 6] { + pub fn all() -> [Self; 5] { [ Self::Sessions, Self::Transactions, Self::Spans, Self::Custom, - Self::Stats, Self::Unsupported, ] } - /// Returns `true` if metric stats are enabled for this namespace. - pub fn has_metric_stats(&self) -> bool { - matches!(self, Self::Custom) - } - /// Returns the string representation for this metric type. pub fn as_str(&self) -> &'static str { match self { @@ -155,7 +145,6 @@ impl MetricNamespace { Self::Transactions => "transactions", Self::Spans => "spans", Self::Custom => "custom", - Self::Stats => "metric_stats", Self::Unsupported => "unsupported", } } @@ -170,7 +159,6 @@ impl std::str::FromStr for MetricNamespace { "transactions" => Ok(Self::Transactions), "spans" => Ok(Self::Spans), "custom" => Ok(Self::Custom), - "metric_stats" => Ok(Self::Stats), _ => Ok(Self::Unsupported), } } diff --git a/relay-cogs/src/lib.rs b/relay-cogs/src/lib.rs index b5f217c8f46..ad30da651bb 100644 --- a/relay-cogs/src/lib.rs +++ b/relay-cogs/src/lib.rs @@ -178,8 +178,6 @@ pub enum AppFeature { MetricsSessions, /// Metrics in the custom namespace. MetricsCustom, - /// Metrics in the `metric_stats` namespace. - MetricsStats, /// Metrics in the unsupported namespace. /// /// This is usually not emitted, since metrics in the unsupported @@ -206,7 +204,6 @@ impl AppFeature { Self::MetricsSpans => "metrics_spans", Self::MetricsSessions => "metrics_sessions", Self::MetricsCustom => "metrics_custom", - Self::MetricsStats => "metrics_metric_stats", Self::MetricsUnsupported => "metrics_unsupported", Self::Profiles => "profiles", } diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 80ac37a608a..0c22ed71da0 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -575,23 +575,6 @@ impl Default for Metrics { } } -/// Controls processing of Sentry metrics and metric metadata. -#[derive(Serialize, Deserialize, Debug, Default)] -#[serde(default)] -pub struct SentryMetrics { - /// Whether metric stats are collected and emitted. - /// - /// Metric stats are always collected and emitted when processing - /// is enabled. - /// - /// This option is required for running multiple trusted Relays in a chain - /// and you want the metric stats to be collected and forwarded from - /// the first Relay in the chain. - /// - /// Defaults to `false`. - pub metric_stats_enabled: bool, -} - /// Controls various limits #[derive(Serialize, Deserialize, Debug)] #[serde(default)] @@ -1612,8 +1595,6 @@ struct ConfigValues { #[serde(default)] metrics: Metrics, #[serde(default)] - sentry_metrics: SentryMetrics, - #[serde(default)] sentry: relay_log::SentryConfig, #[serde(default)] processing: Processing, @@ -2365,14 +2346,6 @@ impl Config { self.values.limits.max_metric_buckets_size.as_bytes() } - /// Whether metric stats are collected and emitted. - /// - /// Metric stats are always collected and emitted when processing - /// is enabled. - pub fn metric_stats_enabled(&self) -> bool { - self.values.sentry_metrics.metric_stats_enabled || self.values.processing.enabled - } - /// Returns the maximum payload size for general API requests. pub fn max_api_payload_size(&self) -> usize { self.values.limits.max_api_payload_size.as_bytes() diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index 1ad959c02e5..46e5c4912f5 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -147,17 +147,6 @@ pub struct Options { )] pub metric_bucket_dist_encodings: BucketEncodings, - /// Rollout rate for metric stats. - /// - /// Rate needs to be between `0.0` and `1.0`. - /// If set to `1.0` all organizations will have metric stats enabled. - #[serde( - rename = "relay.metric-stats.rollout-rate", - deserialize_with = "default_on_error", - skip_serializing_if = "is_default" - )] - pub metric_stats_rollout_rate: f32, - /// Overall sampling of span extraction. /// /// This number represents the fraction of transactions for which @@ -220,7 +209,6 @@ pub struct BucketEncodings { spans: BucketEncoding, profiles: BucketEncoding, custom: BucketEncoding, - metric_stats: BucketEncoding, } impl BucketEncodings { @@ -230,7 +218,6 @@ impl BucketEncodings { MetricNamespace::Transactions => self.transactions, MetricNamespace::Spans => self.spans, MetricNamespace::Custom => self.custom, - MetricNamespace::Stats => self.metric_stats, // Always force the legacy encoding for sessions, // sessions are not part of the generic metrics platform with different // consumer which are not (yet) updated to support the new data. @@ -266,7 +253,6 @@ where spans: encoding, profiles: encoding, custom: encoding, - metric_stats: encoding, }) } @@ -470,7 +456,6 @@ mod tests { spans: BucketEncoding::Legacy, profiles: BucketEncoding::Legacy, custom: BucketEncoding::Legacy, - metric_stats: BucketEncoding::Legacy, } ); assert_eq!( @@ -480,7 +465,6 @@ mod tests { spans: BucketEncoding::Zstd, profiles: BucketEncoding::Zstd, custom: BucketEncoding::Zstd, - metric_stats: BucketEncoding::Zstd, } ); } @@ -492,7 +476,6 @@ mod tests { spans: BucketEncoding::Zstd, profiles: BucketEncoding::Base64, custom: BucketEncoding::Zstd, - metric_stats: BucketEncoding::Base64, }; let s = serde_json::to_string(&original).unwrap(); let s = format!( diff --git a/relay-metrics/src/cogs.rs b/relay-metrics/src/cogs.rs index c8df9d5701e..be5db5940fc 100644 --- a/relay-metrics/src/cogs.rs +++ b/relay-metrics/src/cogs.rs @@ -41,7 +41,6 @@ fn to_app_feature(ns: MetricNamespace) -> AppFeature { MetricNamespace::Transactions => AppFeature::MetricsTransactions, MetricNamespace::Spans => AppFeature::MetricsSpans, MetricNamespace::Custom => AppFeature::MetricsCustom, - MetricNamespace::Stats => AppFeature::MetricsStats, MetricNamespace::Unsupported => AppFeature::MetricsUnsupported, } } diff --git a/relay-metrics/src/utils.rs b/relay-metrics/src/utils.rs index f5cebd84997..1f6d61031aa 100644 --- a/relay-metrics/src/utils.rs +++ b/relay-metrics/src/utils.rs @@ -23,8 +23,6 @@ pub struct ByNamespace { pub spans: T, /// Value for the [`MetricNamespace::Custom`] namespace. pub custom: T, - /// Value for the [`MetricNamespace::Stats`] namespace. - pub stats: T, /// Value for the [`MetricNamespace::Unsupported`] namespace. pub unsupported: T, } @@ -37,7 +35,6 @@ impl ByNamespace { MetricNamespace::Transactions => &self.transactions, MetricNamespace::Spans => &self.spans, MetricNamespace::Custom => &self.custom, - MetricNamespace::Stats => &self.stats, MetricNamespace::Unsupported => &self.unsupported, } } @@ -49,7 +46,6 @@ impl ByNamespace { MetricNamespace::Transactions => &mut self.transactions, MetricNamespace::Spans => &mut self.spans, MetricNamespace::Custom => &mut self.custom, - MetricNamespace::Stats => &mut self.stats, MetricNamespace::Unsupported => &mut self.unsupported, } } @@ -57,7 +53,7 @@ impl ByNamespace { impl IntoIterator for ByNamespace { type Item = (MetricNamespace, T); - type IntoIter = std::array::IntoIter<(MetricNamespace, T), 6>; + type IntoIter = std::array::IntoIter<(MetricNamespace, T), 5>; fn into_iter(self) -> Self::IntoIter { let Self { @@ -65,7 +61,6 @@ impl IntoIterator for ByNamespace { transactions, spans, custom, - stats, unsupported, } = self; @@ -74,7 +69,6 @@ impl IntoIterator for ByNamespace { (MetricNamespace::Transactions, transactions), (MetricNamespace::Spans, spans), (MetricNamespace::Custom, custom), - (MetricNamespace::Stats, stats), (MetricNamespace::Unsupported, unsupported), ] .into_iter() @@ -118,7 +112,6 @@ macro_rules! impl_op { transactions, spans, custom, - stats, unsupported, } = self; @@ -126,7 +119,6 @@ macro_rules! impl_op { $op::$opfn(transactions, rhs.transactions); $op::$opfn(spans, rhs.spans); $op::$opfn(custom, rhs.custom); - $op::$opfn(stats, rhs.stats); $op::$opfn(unsupported, rhs.unsupported); } } diff --git a/relay-server/src/metrics/metric_stats.rs b/relay-server/src/metrics/metric_stats.rs deleted file mode 100644 index 9b30f459870..00000000000 --- a/relay-server/src/metrics/metric_stats.rs +++ /dev/null @@ -1,475 +0,0 @@ -use std::collections::BTreeMap; -use std::sync::{Arc, OnceLock}; - -use relay_base_schema::organization::OrganizationId; -#[cfg(feature = "processing")] -use relay_cardinality::{CardinalityLimit, CardinalityReport}; -use relay_config::Config; -#[cfg(feature = "processing")] -use relay_metrics::GaugeValue; -use relay_metrics::{Bucket, BucketValue, MetricName, UnixTimestamp}; -use relay_quotas::Scoping; -use relay_system::Addr; - -use crate::metrics::TrackableBucket; -use crate::services::global_config::GlobalConfigHandle; -use crate::services::metrics::{Aggregator, MergeBuckets}; -use crate::services::outcome::Outcome; -use crate::utils::is_rolled_out; - -fn volume_metric_mri() -> MetricName { - static VOLUME_METRIC_MRI: OnceLock = OnceLock::new(); - - VOLUME_METRIC_MRI - .get_or_init(|| "c:metric_stats/volume@none".into()) - .clone() -} - -#[cfg(feature = "processing")] -fn cardinality_metric_mri() -> MetricName { - static CARDINALITY_METRIC_MRI: OnceLock = OnceLock::new(); - - CARDINALITY_METRIC_MRI - .get_or_init(|| "g:metric_stats/cardinality@none".into()) - .clone() -} - -/// Tracks stats about metrics. -/// -/// Metric stats are similar to outcomes for envelopes, they record -/// the final "fate" of a metric and its properties. -/// -/// Unlike outcomes metric stats are tracked on a per MRI basis -/// and contain additional metadata, like the cardinality of a metric. -#[derive(Clone, Debug)] -pub struct MetricStats { - config: Arc, - global_config: GlobalConfigHandle, - aggregator: Addr, -} - -impl MetricStats { - /// Creates a new [`MetricStats`] instance. - pub fn new( - config: Arc, - global_config: GlobalConfigHandle, - aggregator: Addr, - ) -> Self { - Self { - config, - global_config, - aggregator, - } - } - - /// Tracks the metric volume and outcome for the bucket. - pub fn track_metric(&self, scoping: Scoping, bucket: impl TrackableBucket, outcome: &Outcome) { - if !self.is_enabled(scoping) { - return; - } - - let Some(volume) = self.to_volume_metric(&bucket, outcome) else { - return; - }; - - relay_log::trace!( - "Tracking volume of {} for mri '{}': {}", - bucket.metadata().merges, - bucket.name(), - outcome - ); - self.aggregator - .send(MergeBuckets::new(scoping.project_key, vec![volume])); - } - - /// Tracks the cardinality of a metric. - #[cfg(feature = "processing")] - pub fn track_cardinality( - &self, - scoping: Scoping, - limit: &CardinalityLimit, - report: &CardinalityReport, - ) { - if !self.is_enabled(scoping) { - return; - } - - let Some(cardinality) = self.to_cardinality_metric(limit, report) else { - return; - }; - - relay_log::trace!( - "Tracking cardinality '{}' for mri '{}': {}", - limit.id, - report.metric_name.as_deref().unwrap_or("-"), - report.cardinality, - ); - self.aggregator - .send(MergeBuckets::new(scoping.project_key, vec![cardinality])); - } - - fn is_enabled(&self, scoping: Scoping) -> bool { - self.config.metric_stats_enabled() && self.is_rolled_out(scoping.organization_id) - } - - fn is_rolled_out(&self, organization_id: OrganizationId) -> bool { - let rate = self - .global_config - .current() - .options - .metric_stats_rollout_rate; - - is_rolled_out(organization_id.value(), rate).is_keep() - } - - fn to_volume_metric(&self, bucket: impl TrackableBucket, outcome: &Outcome) -> Option { - let volume = bucket.metadata().merges; - if volume == 0 { - return None; - } - - let namespace = bucket.name().namespace(); - if !namespace.has_metric_stats() { - return None; - } - - let mut tags = BTreeMap::from([ - ("mri".to_owned(), bucket.name().to_string()), - ("mri.type".to_owned(), bucket.ty().to_string()), - ("mri.namespace".to_owned(), namespace.to_string()), - ( - "outcome.id".to_owned(), - outcome.to_outcome_id().as_u8().to_string(), - ), - ]); - - if let Some(reason) = outcome.to_reason() { - tags.insert("outcome.reason".to_owned(), reason.into_owned()); - } - - Some(Bucket { - timestamp: UnixTimestamp::now(), - width: 0, - name: volume_metric_mri(), - value: BucketValue::Counter(volume.into()), - tags, - metadata: Default::default(), - }) - } - - #[cfg(feature = "processing")] - fn to_cardinality_metric( - &self, - limit: &CardinalityLimit, - report: &CardinalityReport, - ) -> Option { - let cardinality = report.cardinality; - if cardinality == 0 { - return None; - } - - let mut tags = BTreeMap::from([ - ("cardinality.limit".to_owned(), limit.id.clone()), - ( - "cardinality.scope".to_owned(), - limit.scope.as_str().to_owned(), - ), - ( - "cardinality.window".to_owned(), - limit.window.window_seconds.to_string(), - ), - ]); - - if let Some(ref name) = report.metric_name { - tags.insert("mri".to_owned(), name.to_string()); - tags.insert("mri.namespace".to_owned(), name.namespace().to_string()); - if let Some(t) = name.try_type() { - tags.insert("mri.type".to_owned(), t.to_string()); - } - } else { - if let Some(namespace) = limit.namespace { - tags.insert("mri.namespace".to_owned(), namespace.to_string()); - } - if let Some(t) = report.metric_type { - tags.insert("mri.type".to_owned(), t.to_string()); - } - } - - Some(Bucket { - timestamp: report.timestamp, - width: 0, - name: cardinality_metric_mri(), - value: BucketValue::Gauge(GaugeValue::single(cardinality.into())), - tags, - metadata: Default::default(), - }) - } -} - -#[cfg(test)] -mod tests { - use relay_base_schema::project::{ProjectId, ProjectKey}; - #[cfg(feature = "processing")] - use relay_cardinality::{CardinalityScope, SlidingWindow}; - use relay_dynamic_config::GlobalConfig; - #[cfg(feature = "processing")] - use relay_metrics::{MetricNamespace, MetricType}; - use relay_quotas::ReasonCode; - use tokio::sync::mpsc::UnboundedReceiver; - - use super::*; - - impl MetricStats { - pub fn test() -> (Self, UnboundedReceiver) { - create_metric_stats(1.0) - } - } - - fn create_metric_stats(rollout_rate: f32) -> (MetricStats, UnboundedReceiver) { - let config = Config::from_json_value(serde_json::json!({ - "processing": { - "enabled": true, - "kafka_config": [], - } - })) - .unwrap(); - - let mut global_config = GlobalConfig::default(); - global_config.options.metric_stats_rollout_rate = rollout_rate; - let global_config = GlobalConfigHandle::fixed(global_config); - - let (addr, receiver) = Addr::custom(); - let ms = MetricStats::new(Arc::new(config), global_config, addr); - - (ms, receiver) - } - - fn scoping() -> Scoping { - Scoping { - organization_id: OrganizationId::new(42), - project_id: ProjectId::new(21), - project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - key_id: Some(17), - } - } - - macro_rules! tags { - ($(($key:expr, $value:expr),)*) => { - BTreeMap::from([ - $(($key.to_owned(), $value.to_owned())),* - ]) - } - } - - #[test] - fn test_metric_stats_volume() { - let (ms, mut receiver) = create_metric_stats(1.0); - - let scoping = scoping(); - let mut bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap(); - - ms.track_metric(scoping, &bucket, &Outcome::Accepted); - - bucket.metadata.merges = bucket.metadata.merges.saturating_add(41); - ms.track_metric( - scoping, - &bucket, - &Outcome::RateLimited(Some(ReasonCode::new("foobar"))), - ); - - drop(ms); - - let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap(); - assert_eq!(mb.project_key, scoping.project_key); - - assert_eq!(mb.buckets.len(), 1); - let bucket = mb.buckets.pop().unwrap(); - - assert_eq!(&*bucket.name, "c:metric_stats/volume@none"); - assert_eq!(bucket.value, BucketValue::Counter(1.into())); - assert_eq!( - bucket.tags, - tags!( - ("mri", "d:custom/rt@millisecond"), - ("mri.type", "d"), - ("mri.namespace", "custom"), - ("outcome.id", "0"), - ) - ); - - let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap(); - assert_eq!(mb.project_key, scoping.project_key); - - assert_eq!(mb.buckets.len(), 1); - let bucket = mb.buckets.pop().unwrap(); - - assert_eq!(&*bucket.name, "c:metric_stats/volume@none"); - assert_eq!(bucket.value, BucketValue::Counter(42.into())); - assert_eq!( - bucket.tags, - tags!( - ("mri", "d:custom/rt@millisecond"), - ("mri.type", "d"), - ("mri.namespace", "custom"), - ("outcome.id", "2"), - ("outcome.reason", "foobar"), - ) - ); - - assert!(receiver.blocking_recv().is_none()); - } - - #[test] - #[cfg(feature = "processing")] - fn test_metric_stats_cardinality_name() { - let (ms, mut receiver) = create_metric_stats(1.0); - - let scoping = scoping(); - let limit = CardinalityLimit { - id: "test".to_owned(), - passive: false, - report: true, - window: SlidingWindow { - window_seconds: 246, - granularity_seconds: 123, - }, - limit: 99, - scope: CardinalityScope::Name, - namespace: None, - }; - let report = CardinalityReport { - timestamp: UnixTimestamp::from_secs(3333), - organization_id: Some(scoping.organization_id), - project_id: Some(scoping.project_id), - metric_type: None, - metric_name: Some(MetricName::from("d:custom/rt@millisecond")), - cardinality: 12, - }; - - ms.track_cardinality(scoping, &limit, &report); - - drop(ms); - - let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap(); - assert_eq!(mb.project_key, scoping.project_key); - - assert_eq!(mb.buckets.len(), 1); - let bucket = mb.buckets.pop().unwrap(); - - assert_eq!(&*bucket.name, "g:metric_stats/cardinality@none"); - assert_eq!(bucket.timestamp, UnixTimestamp::from_secs(3333)); - assert_eq!( - bucket.value, - BucketValue::Gauge(GaugeValue { - last: 12.into(), - min: 12.into(), - max: 12.into(), - sum: 12.into(), - count: 1, - }) - ); - assert_eq!( - bucket.tags, - tags!( - ("mri", "d:custom/rt@millisecond"), - ("mri.type", "d"), - ("mri.namespace", "custom"), - ("cardinality.limit", "test"), - ("cardinality.scope", "name"), - ("cardinality.window", "246"), - ) - ); - - assert!(receiver.blocking_recv().is_none()); - } - - #[test] - #[cfg(feature = "processing")] - fn test_metric_stats_cardinality_type() { - let (ms, mut receiver) = create_metric_stats(1.0); - - let scoping = scoping(); - let limit = CardinalityLimit { - id: "test".to_owned(), - passive: false, - report: true, - window: SlidingWindow { - window_seconds: 246, - granularity_seconds: 123, - }, - limit: 99, - scope: CardinalityScope::Type, - namespace: Some(MetricNamespace::Spans), - }; - let report = CardinalityReport { - timestamp: UnixTimestamp::from_secs(2222), - organization_id: Some(scoping.organization_id), - project_id: Some(scoping.project_id), - metric_type: Some(MetricType::Distribution), - metric_name: None, - cardinality: 12, - }; - - ms.track_cardinality(scoping, &limit, &report); - - drop(ms); - - let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap(); - assert_eq!(mb.project_key, scoping.project_key); - - assert_eq!(mb.buckets.len(), 1); - let bucket = mb.buckets.pop().unwrap(); - - assert_eq!(&*bucket.name, "g:metric_stats/cardinality@none"); - assert_eq!(bucket.timestamp, UnixTimestamp::from_secs(2222)); - assert_eq!( - bucket.value, - BucketValue::Gauge(GaugeValue { - last: 12.into(), - min: 12.into(), - max: 12.into(), - sum: 12.into(), - count: 1, - }) - ); - assert_eq!( - bucket.tags, - tags!( - ("mri.type", "d"), - ("mri.namespace", "spans"), - ("cardinality.limit", "test"), - ("cardinality.scope", "type"), - ("cardinality.window", "246"), - ) - ); - - assert!(receiver.blocking_recv().is_none()); - } - - #[test] - fn test_metric_stats_rollout_rate_disabled() { - let (ms, mut receiver) = create_metric_stats(0.0); - - let scoping = scoping(); - let bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap(); - ms.track_metric(scoping, &bucket, &Outcome::Accepted); - - drop(ms); - - assert!(receiver.blocking_recv().is_none()); - } - - #[test] - fn test_metric_stats_disabled_namespace() { - let (ms, mut receiver) = create_metric_stats(1.0); - - let scoping = scoping(); - let bucket = - Bucket::parse(b"transactions/rt@millisecond:57|d", UnixTimestamp::now()).unwrap(); - ms.track_metric(scoping, &bucket, &Outcome::Accepted); - - drop(ms); - - assert!(receiver.blocking_recv().is_none()); - } -} diff --git a/relay-server/src/metrics/minimal.rs b/relay-server/src/metrics/minimal.rs index 833337777c0..3da51523880 100644 --- a/relay-server/src/metrics/minimal.rs +++ b/relay-server/src/metrics/minimal.rs @@ -1,5 +1,5 @@ use relay_metrics::{ - BucketMetadata, CounterType, MetricName, MetricNamespace, MetricResourceIdentifier, MetricType, + BucketMetadata, CounterType, MetricName, MetricNamespace, MetricResourceIdentifier, }; use serde::Deserialize; use serde::de::IgnoredAny; @@ -24,10 +24,6 @@ impl TrackableBucket for MinimalTrackableBucket { &self.name } - fn ty(&self) -> relay_metrics::MetricType { - self.value.ty() - } - fn summary(&self) -> BucketSummary { let mri = match MetricResourceIdentifier::parse(self.name()) { Ok(mri) => mri, @@ -68,17 +64,6 @@ enum MinimalValue { Gauge(IgnoredAny), } -impl MinimalValue { - fn ty(self) -> MetricType { - match self { - MinimalValue::Counter(_) => MetricType::Counter, - MinimalValue::Distribution(_) => MetricType::Distribution, - MinimalValue::Set(_) => MetricType::Set, - MinimalValue::Gauge(_) => MetricType::Gauge, - } - } -} - #[cfg(test)] mod tests { use insta::assert_debug_snapshot; diff --git a/relay-server/src/metrics/mod.rs b/relay-server/src/metrics/mod.rs index 2576a7b385a..a3c531cd95d 100644 --- a/relay-server/src/metrics/mod.rs +++ b/relay-server/src/metrics/mod.rs @@ -1,13 +1,11 @@ #[cfg(feature = "processing")] mod bucket_encoding; -mod metric_stats; mod minimal; mod outcomes; mod rate_limits; #[cfg(feature = "processing")] pub use self::bucket_encoding::*; -pub use self::metric_stats::*; pub use self::minimal::*; pub use self::outcomes::*; pub use self::rate_limits::*; diff --git a/relay-server/src/metrics/outcomes.rs b/relay-server/src/metrics/outcomes.rs index 672d732ac8d..55d94a5af6d 100644 --- a/relay-server/src/metrics/outcomes.rs +++ b/relay-server/src/metrics/outcomes.rs @@ -1,13 +1,12 @@ use chrono::Utc; use relay_metrics::{ Bucket, BucketMetadata, BucketView, BucketViewValue, MetricName, MetricNamespace, - MetricResourceIdentifier, MetricType, + MetricResourceIdentifier, }; use relay_quotas::{DataCategory, Scoping}; use relay_system::Addr; use crate::envelope::SourceQuantities; -use crate::metrics::MetricStats; use crate::services::outcome::{Outcome, TrackOutcome}; #[cfg(feature = "processing")] use relay_cardinality::{CardinalityLimit, CardinalityReport}; @@ -19,17 +18,13 @@ use relay_cardinality::{CardinalityLimit, CardinalityReport}; /// like custom. #[derive(Debug, Clone)] pub struct MetricOutcomes { - metric_stats: MetricStats, outcomes: Addr, } impl MetricOutcomes { /// Creates a new [`MetricOutcomes`]. - pub fn new(metric_stats: MetricStats, outcomes: Addr) -> Self { - Self { - metric_stats, - outcomes, - } + pub fn new(outcomes: Addr) -> Self { + Self { outcomes } } /// Tracks an outcome for a list of buckets and generates the necessary outcomes. @@ -67,28 +62,16 @@ impl MetricOutcomes { } } } - - // When rejecting metrics, we need to make sure that the number of merges is correctly handled - // for buckets views, since if we have a bucket which has 5 merges, and it's split into 2 - // bucket views, we will emit the volume of the rejection as 5 + 5 merges since we still read - // the underlying metadata for each view, and it points to the same bucket reference. - // Possible solutions to this problem include emitting the merges only if the bucket view is - // the first of view or distributing uniformly the metadata between split views. - for bucket in buckets { - relay_log::trace!("{:<50} -> {outcome}", bucket.name()); - self.metric_stats.track_metric(scoping, bucket, &outcome) - } } /// Tracks the cardinality of a metric. #[cfg(feature = "processing")] pub fn cardinality( &self, - scoping: Scoping, - limit: &CardinalityLimit, - report: &CardinalityReport, + _scoping: Scoping, + _limit: &CardinalityLimit, + _report: &CardinalityReport, ) { - self.metric_stats.track_cardinality(scoping, limit, report) } } @@ -108,9 +91,6 @@ pub trait TrackableBucket { /// Full mri of the bucket. fn name(&self) -> &MetricName; - /// Type of the metric bucket. - fn ty(&self) -> MetricType; - /// Extracts quota information from the metric bucket. /// /// If the metric was extracted from one or more transactions or spans, it returns the amount @@ -126,10 +106,6 @@ impl TrackableBucket for &T { (**self).name() } - fn ty(&self) -> MetricType { - (**self).ty() - } - fn summary(&self) -> BucketSummary { (**self).summary() } @@ -144,10 +120,6 @@ impl TrackableBucket for Bucket { &self.name } - fn ty(&self) -> MetricType { - self.value.ty() - } - fn summary(&self) -> BucketSummary { BucketView::new(self).summary() } @@ -162,10 +134,6 @@ impl TrackableBucket for BucketView<'_> { self.name() } - fn ty(&self) -> MetricType { - self.ty() - } - fn summary(&self) -> BucketSummary { let mri = match MetricResourceIdentifier::parse(self.name()) { Ok(mri) => mri, diff --git a/relay-server/src/metrics/rate_limits.rs b/relay-server/src/metrics/rate_limits.rs index 993008ae8bb..5216763ab90 100644 --- a/relay-server/src/metrics/rate_limits.rs +++ b/relay-server/src/metrics/rate_limits.rs @@ -220,8 +220,6 @@ mod tests { use relay_system::Addr; use smallvec::smallvec; - use crate::metrics::MetricStats; - use super::*; fn deny(category: DataCategory) -> Vec { @@ -243,7 +241,7 @@ mod tests { quotas: Vec, ) -> (Vec, Vec<(Outcome, DataCategory, u32)>) { let (outcome_sink, mut rx) = Addr::custom(); - let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_sink.clone()); + let metric_outcomes = MetricOutcomes::new(outcome_sink.clone()); let mut limiter = MetricsLimiter::create( metrics, diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index e130c00afd6..fdfa7cb50b1 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -2,7 +2,7 @@ use std::convert::Infallible; use std::sync::Arc; use std::time::Duration; -use crate::metrics::{MetricOutcomes, MetricStats}; +use crate::metrics::MetricOutcomes; use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics}; use crate::services::buffer::{ ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair, @@ -219,13 +219,7 @@ impl ServiceState { let aggregator_handle = aggregator.handle(); let aggregator = services.start(aggregator); - let metric_stats = MetricStats::new( - config.clone(), - global_config_handle.clone(), - aggregator.clone(), - ); - - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); + let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone()); #[cfg(feature = "processing")] let store_pool = create_store_pool(&config)?; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 68f300128d4..24324a7f25c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2633,7 +2633,7 @@ impl EnvelopeProcessorService { return false; } - if !self::metrics::is_valid_namespace(bucket, source) { + if !self::metrics::is_valid_namespace(bucket) { return false; } diff --git a/relay-server/src/services/processor/metrics.rs b/relay-server/src/services/processor/metrics.rs index 15979b3772b..d91d9c20636 100644 --- a/relay-server/src/services/processor/metrics.rs +++ b/relay-server/src/services/processor/metrics.rs @@ -5,21 +5,18 @@ use relay_quotas::Scoping; use crate::metrics::MetricOutcomes; use crate::services::outcome::Outcome; -use crate::services::processor::BucketSource; use crate::services::projects::project::ProjectInfo; /// Checks if the namespace of the passed bucket is valid. /// /// This is returns `true` for most namespaces except: /// - [`MetricNamespace::Unsupported`]: Equal to invalid/unknown namespaces. -/// - [`MetricNamespace::Stats`]: Metric stats are only allowed if the `source` is [`BucketSource::Internal`]. -pub fn is_valid_namespace(bucket: &Bucket, source: BucketSource) -> bool { +pub fn is_valid_namespace(bucket: &Bucket) -> bool { match bucket.name.namespace() { MetricNamespace::Sessions => true, MetricNamespace::Transactions => true, MetricNamespace::Spans => true, MetricNamespace::Custom => true, - MetricNamespace::Stats => source == BucketSource::Internal, MetricNamespace::Unsupported => false, } } @@ -62,7 +59,6 @@ fn is_metric_namespace_valid(state: &ProjectInfo, namespace: MetricNamespace) -> MetricNamespace::Transactions => true, MetricNamespace::Spans => true, MetricNamespace::Custom => state.has_feature(Feature::CustomMetrics), - MetricNamespace::Stats => true, MetricNamespace::Unsupported => false, } } @@ -74,9 +70,6 @@ mod tests { use relay_metrics::{BucketValue, UnixTimestamp}; use relay_system::Addr; - use crate::metrics::MetricStats; - use crate::services::metrics::Aggregator; - use super::*; fn create_custom_bucket_with_name(name: String) -> Bucket { @@ -93,8 +86,7 @@ mod tests { #[test] fn test_apply_project_info_with_disabled_custom_namespace() { let (outcome_aggregator, _) = Addr::custom(); - let (metric_stats, mut metric_stats_rx) = MetricStats::test(); - let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator); + let metric_outcomes = MetricOutcomes::new(outcome_aggregator); let b1 = create_custom_bucket_with_name("cpu_time".into()); let b2 = create_custom_bucket_with_name("memory_usage".into()); @@ -113,16 +105,5 @@ mod tests { ); assert!(buckets.is_empty()); - - // We assert that two metrics are emitted by metric stats. - for _ in 0..2 { - let value = metric_stats_rx.blocking_recv().unwrap(); - let Aggregator::MergeBuckets(merge_buckets) = value; - assert_eq!(merge_buckets.buckets.len(), 1); - let BucketValue::Counter(value) = merge_buckets.buckets[0].value else { - panic!(); - }; - assert_eq!(value.to_f64(), 1.0); - } } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 30d830eaa56..4c4f73881da 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1901,7 +1901,6 @@ impl Message for KafkaMessage<'_> { MetricNamespace::Transactions => "metric_transactions", MetricNamespace::Spans => "metric_spans", MetricNamespace::Custom => "metric_custom", - MetricNamespace::Stats => "metric_metric_stats", MetricNamespace::Unsupported => "metric_unsupported", }, KafkaMessage::CheckIn(_) => "check_in", diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index b52cea3bdf5..16cc3f1e661 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -16,7 +16,7 @@ use relay_test::mock_service; use crate::envelope::{Envelope, Item, ItemType}; use crate::managed::ManagedEnvelope; -use crate::metrics::{MetricOutcomes, MetricStats}; +use crate::metrics::MetricOutcomes; #[cfg(feature = "processing")] use crate::service::create_redis_clients; use crate::services::global_config::GlobalConfigHandle; @@ -130,7 +130,7 @@ pub async fn create_test_processor(config: Config) -> EnvelopeProcessorService { .as_ref() .map(|p| GlobalRateLimitsService::new(p.quotas.clone()).start_detached()); - let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); + let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone()); let config = Arc::new(config); EnvelopeProcessorService::new( @@ -164,8 +164,7 @@ pub async fn create_test_processor_with_addrs( .map(|c| create_redis_clients(c)) .transpose() .unwrap(); - let metric_outcomes = - MetricOutcomes::new(MetricStats::test().0, addrs.outcome_aggregator.clone()); + let metric_outcomes = MetricOutcomes::new(addrs.outcome_aggregator.clone()); let config = Arc::new(config); EnvelopeProcessorService::new( diff --git a/relay-server/src/utils/pick.rs b/relay-server/src/utils/pick.rs index 1549e4a5616..f42c363b49c 100644 --- a/relay-server/src/utils/pick.rs +++ b/relay-server/src/utils/pick.rs @@ -12,6 +12,10 @@ pub enum PickResult { /// /// Deterministically makes a rollout decision for an id, usually organization id, /// and rate. +#[allow( + dead_code, + reason = "A utility which is frequently used and removed again" +)] pub fn is_rolled_out(id: u64, rate: f32) -> PickResult { match ((id % 100000) as f32 / 100000.0f32) < rate { true => PickResult::Keep, diff --git a/tests/integration/test_metric_stats.py b/tests/integration/test_metric_stats.py deleted file mode 100644 index a6795d77667..00000000000 --- a/tests/integration/test_metric_stats.py +++ /dev/null @@ -1,304 +0,0 @@ -import copy -from typing import Any -import pytest -import time -from dataclasses import dataclass -from .test_metrics import TEST_CONFIG - - -@dataclass -class MetricStatsByMri: - volume: dict[str, Any] - cardinality: dict[str, Any] - other: list[Any] - - -def metric_stats_by_mri(metrics_consumer, count, timeout=5): - volume = dict() - cardinality = dict() - other = list() - - for i in range(count): - try: - metric, _ = metrics_consumer.get_metric(timeout) - if metric["name"] == "c:metric_stats/volume@none": - volume[metric["tags"]["mri"]] = metric - elif metric["name"] == "g:metric_stats/cardinality@none": - cardinality[metric["tags"]["mri"]] = metric - else: - other.append(metric) - except AssertionError: - pytest.fail(f"Message {i + 1} not found.") - - metrics_consumer.assert_empty() - return MetricStatsByMri(volume=volume, cardinality=cardinality, other=other) - - -@pytest.mark.parametrize("mode", ["default", "chain"]) -def test_metric_stats_simple( - mini_sentry, relay, relay_with_processing, relay_credentials, metrics_consumer, mode -): - mini_sentry.global_config["options"]["relay.metric-stats.rollout-rate"] = 1.0 - - metrics_consumer = metrics_consumer() - - if mode == "default": - relay = relay_with_processing(options=TEST_CONFIG) - elif mode == "chain": - credentials = relay_credentials() - static_relays = { - credentials["id"]: { - "public_key": credentials["public_key"], - "internal": True, - }, - } - relay = relay( - relay_with_processing(options=TEST_CONFIG, static_relays=static_relays), - options=TEST_CONFIG, - credentials=credentials, - ) - - project_id = 42 - project_config = mini_sentry.add_basic_project_config(project_id) - project_config["config"]["features"] = [ - "organizations:custom-metrics", - "organizations:metric-stats", - ] - project_config["config"]["metrics"] = { - "cardinalityLimits": [ - { - "id": "custom-limit", - "window": {"windowSeconds": 3600, "granularitySeconds": 600}, - "report": True, - "limit": 100, - "scope": "name", - "namespace": "custom", - } - ] - } - - relay.send_metrics( - project_id, "custom/foo:1337|d\ncustom/foo:12|d|#tag:value\ncustom/bar:42|s" - ) - - metrics = metric_stats_by_mri(metrics_consumer, 7) - - assert metrics.volume["d:custom/foo@none"]["org_id"] == 0 - assert metrics.volume["d:custom/foo@none"]["project_id"] == project_id - assert metrics.volume["d:custom/foo@none"]["value"] == 2.0 - assert metrics.volume["d:custom/foo@none"]["tags"] == { - "mri": "d:custom/foo@none", - "mri.type": "d", - "mri.namespace": "custom", - "outcome.id": "0", - } - assert metrics.volume["s:custom/bar@none"]["org_id"] == 0 - assert metrics.volume["s:custom/bar@none"]["project_id"] == project_id - assert metrics.volume["s:custom/bar@none"]["value"] == 1.0 - assert metrics.volume["s:custom/bar@none"]["tags"] == { - "mri": "s:custom/bar@none", - "mri.type": "s", - "mri.namespace": "custom", - "outcome.id": "0", - } - assert len(metrics.volume) == 2 - assert metrics.cardinality["d:custom/foo@none"]["org_id"] == 0 - assert metrics.cardinality["d:custom/foo@none"]["project_id"] == project_id - assert metrics.cardinality["d:custom/foo@none"]["value"] == { - "count": 1, - "last": 2.0, - "max": 2.0, - "min": 2.0, - "sum": 2.0, - } - assert metrics.cardinality["d:custom/foo@none"]["tags"] == { - "mri": "d:custom/foo@none", - "mri.type": "d", - "mri.namespace": "custom", - "cardinality.limit": "custom-limit", - "cardinality.scope": "name", - "cardinality.window": "3600", - } - assert metrics.cardinality["s:custom/bar@none"]["org_id"] == 0 - assert metrics.cardinality["s:custom/bar@none"]["project_id"] == project_id - assert metrics.cardinality["s:custom/bar@none"]["value"] == { - "count": 1, - "last": 1.0, - "max": 1.0, - "min": 1.0, - "sum": 1.0, - } - assert metrics.cardinality["s:custom/bar@none"]["tags"] == { - "mri": "s:custom/bar@none", - "mri.type": "s", - "mri.namespace": "custom", - "cardinality.limit": "custom-limit", - "cardinality.scope": "name", - "cardinality.window": "3600", - } - assert len(metrics.cardinality) == 2 - assert len(metrics.other) == 3 - - -@pytest.mark.parametrize("mode", ["default", "chain"]) -def test_metric_stats_with_limit_surpassed( - mini_sentry, relay, relay_with_processing, relay_credentials, metrics_consumer, mode -): - mini_sentry.global_config["options"]["relay.metric-stats.rollout-rate"] = 1.0 - - metrics_consumer = metrics_consumer() - - if mode == "default": - relay = relay_with_processing(options=TEST_CONFIG) - elif mode == "chain": - credentials = relay_credentials() - static_relays = { - credentials["id"]: { - "public_key": credentials["public_key"], - "internal": True, - }, - } - relay = relay( - relay_with_processing(options=TEST_CONFIG, static_relays=static_relays), - options=TEST_CONFIG, - credentials=credentials, - ) - - project_id = 42 - project_config = mini_sentry.add_basic_project_config(project_id) - project_config["config"]["features"] = [ - "organizations:custom-metrics", - "organizations:metric-stats", - ] - project_config["config"]["metrics"] = { - "cardinalityLimits": [ - { - "id": "org-limit", - "window": {"windowSeconds": 1, "granularitySeconds": 1}, - "report": True, - "limit": 0, - "scope": "org", - "namespace": "custom", - }, - { - "id": "name-limit", - "window": {"windowSeconds": 1, "granularitySeconds": 1}, - "report": True, - "limit": 0, - "scope": "name", - "namespace": "custom", - }, - ] - } - - relay.send_metrics( - project_id, "custom/foo:1337|d\ncustom/baz:12|d|#tag:value\ncustom/bar:42|s" - ) - - metrics = metric_stats_by_mri(metrics_consumer, 3) - assert metrics.volume["d:custom/foo@none"]["org_id"] == 0 - assert metrics.volume["d:custom/foo@none"]["project_id"] == project_id - assert metrics.volume["d:custom/foo@none"]["value"] == 1.0 - assert metrics.volume["d:custom/foo@none"]["tags"] == { - "mri": "d:custom/foo@none", - "mri.type": "d", - "mri.namespace": "custom", - "outcome.id": "6", - "outcome.reason": "name-limit", - } - assert metrics.volume["d:custom/baz@none"]["org_id"] == 0 - assert metrics.volume["d:custom/baz@none"]["project_id"] == project_id - assert metrics.volume["d:custom/baz@none"]["value"] == 1.0 - assert metrics.volume["d:custom/baz@none"]["tags"] == { - "mri": "d:custom/baz@none", - "mri.type": "d", - "mri.namespace": "custom", - "outcome.id": "6", - "outcome.reason": "name-limit", - } - assert metrics.volume["s:custom/bar@none"]["org_id"] == 0 - assert metrics.volume["s:custom/bar@none"]["project_id"] == project_id - assert metrics.volume["s:custom/bar@none"]["value"] == 1.0 - assert metrics.volume["s:custom/bar@none"]["tags"] == { - "mri": "s:custom/bar@none", - "mri.type": "s", - "mri.namespace": "custom", - "outcome.id": "6", - "outcome.reason": "name-limit", - } - assert len(metrics.volume) == 3 - assert len(metrics.cardinality) == 0 - assert len(metrics.other) == 0 - - -def test_metric_stats_max_flush_bytes( - mini_sentry, relay_with_processing, metrics_consumer -): - mini_sentry.global_config["options"]["relay.metric-stats.rollout-rate"] = 1.0 - - metrics_consumer = metrics_consumer() - - relay_config = copy.deepcopy(TEST_CONFIG) - relay_config["aggregator"]["max_flush_bytes"] = 150 - - relay = relay_with_processing(options=relay_config) - - project_id = 42 - project_config = mini_sentry.add_basic_project_config(project_id) - project_config["config"]["features"] = [ - "organizations:custom-metrics", - "organizations:metric-stats", - ] - - # Metric is big enough to be split into multiple smaller metrics when emitting to Kafka, - # make sure the volume counted is still just 1. - relay.send_metrics( - project_id, "custom/foo:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20|d" - ) - - metrics = metric_stats_by_mri(metrics_consumer, 3) - assert metrics.volume["d:custom/foo@none"]["value"] == 1.0 - assert len(metrics.other) == 2 - - -@pytest.mark.parametrize("enabled", [True, False]) -def test_metric_stats_non_processing(mini_sentry, relay, enabled): - mini_sentry.global_config["options"]["relay.metric-stats.rollout-rate"] = 1.0 - - relay = relay( - mini_sentry, - options={ - "sentry_metrics": {"metric_stats_enabled": enabled}, - "http": {"global_metrics": True}, - **TEST_CONFIG, - }, - ) - - project_id = 42 - config = mini_sentry.add_basic_project_config(project_id) - public_key = config["publicKeys"][0]["publicKey"] - - # Custom metrics are disabled -> should log metric stats and drop the metric - relay.send_metrics(project_id, "custom/foo:1337|c") - - if enabled: - metrics = mini_sentry.captured_metrics.get(timeout=3)[public_key] - del metrics[0]["timestamp"] - assert metrics == [ - { - "width": 1, - "name": "c:metric_stats/volume@none", - "type": "c", - "value": 1.0, - "tags": { - "mri": "c:custom/foo@none", - "mri.namespace": "custom", - "mri.type": "c", - "outcome.id": "1", - "outcome.reason": "disabled-namespace", - }, - } - ] - else: - time.sleep(3) - assert mini_sentry.captured_metrics.empty()