diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 57f3bd2024..a2843e7826 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -6,7 +6,8 @@ use relay_config::RelayMode; use relay_event_schema::protocol::{EventId, EventType}; use relay_quotas::RateLimits; use relay_statsd::metric; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items}; use crate::managed::ManagedEnvelope; @@ -296,6 +297,25 @@ fn queue_envelope( Ok(()) } +#[derive(Serialize)] +pub struct StoreResponse { + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + pub log_points: Option>, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct LogPoint { + pub id: usize, + pub message_template: String, + pub tags: HashMap, + pub source_file: String, + pub line_number: usize, + pub sample_rate: Option, + pub sample_expression: Option, + pub valid_until: Option, +} + /// Handles an envelope store request. /// /// Sentry envelopes may come either directly from an HTTP request (the envelope endpoint calls this @@ -304,10 +324,10 @@ fn queue_envelope( /// /// This returns `Some(EventId)` if the envelope contains an event, either explicitly as payload or /// implicitly through an item that will create an event during ingestion. -pub async fn handle_envelope( +pub async fn handle_envelope_with_log_points( state: &ServiceState, envelope: Box, -) -> Result, BadStoreRequest> { +) -> Result { emit_envelope_metrics(&envelope); if state.memory_checker().check_memory().is_exceeded() { @@ -324,7 +344,10 @@ pub async fn handle_envelope( let event_id = managed_envelope.envelope().event_id(); if managed_envelope.envelope().is_empty() { managed_envelope.reject(Outcome::Invalid(DiscardReason::EmptyEnvelope)); - return Ok(event_id); + return Ok(StoreResponse { + id: event_id, + log_points: None, + }); } let project_key = managed_envelope.envelope().meta().public_key(); @@ -338,9 +361,9 @@ pub async fn handle_envelope( state.project_cache_handle().fetch(sampling_project_key); } - let checked = state - .project_cache_handle() - .get(project_key) + let project = state.project_cache_handle().get(project_key); + + let checked = project .check_envelope(managed_envelope) .await .map_err(BadStoreRequest::EventRejected)?; @@ -359,6 +382,25 @@ pub async fn handle_envelope( queue_envelope(state, managed_envelope)?; + let log_points = + if let crate::services::projects::project::ProjectState::Enabled(project_info) = + project.state() + { + dbg!(project_info.last_change); + let log_points = project_info + .config + .filter_settings + .csp + .disallowed_sources + .iter() + .filter_map(|row| serde_json::from_str::(row).ok()) + .collect(); + log_points + } else { + vec![] + }; + dbg!(&log_points); + if checked.rate_limits.is_limited() { // Even if some envelope items have been queued, there might be active rate limits on // other items. Communicate these rate limits to the downstream (Relay or SDK). @@ -366,10 +408,22 @@ pub async fn handle_envelope( // See `IntoResponse` implementation of `BadStoreRequest`. Err(BadStoreRequest::RateLimited(checked.rate_limits)) } else { - Ok(event_id) + return Ok(StoreResponse { + id: event_id, + log_points: Some(log_points), + }); } } +pub async fn handle_envelope( + state: &ServiceState, + envelope: Box, +) -> Result, BadStoreRequest> { + let res = handle_envelope_with_log_points(state, envelope).await?; + + Ok(res.id) +} + fn emit_envelope_metrics(envelope: &Envelope) { let client_name = envelope.meta().client_name().name(); for item in envelope.items() { diff --git a/relay-server/src/endpoints/envelope.rs b/relay-server/src/endpoints/envelope.rs index 8b5a3c2811..57365b7d81 100644 --- a/relay-server/src/endpoints/envelope.rs +++ b/relay-server/src/endpoints/envelope.rs @@ -9,8 +9,6 @@ use axum::routing::{MethodRouter, post}; use axum::{Json, RequestExt}; use bytes::Bytes; use relay_config::Config; -use relay_event_schema::protocol::EventId; -use serde::Serialize; use crate::endpoints::common::{self, BadStoreRequest}; use crate::envelope::Envelope; @@ -102,20 +100,15 @@ impl FromRequest for EnvelopeParams { } } -#[derive(Serialize)] -struct StoreResponse { - #[serde(skip_serializing_if = "Option::is_none")] - id: Option, -} - /// Handler for the envelope store endpoint. async fn handle( state: ServiceState, params: EnvelopeParams, ) -> Result { let envelope = params.extract_envelope()?; - let id = common::handle_envelope(&state, envelope).await?; - Ok(Json(StoreResponse { id })) + let response = common::handle_envelope_with_log_points(&state, envelope).await?; + + Ok(Json(response)) } pub fn route(config: &Config) -> MethodRouter { diff --git a/requirements-dev.txt b/requirements-dev.txt index 321876f8d0..aa2293ada7 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -8,7 +8,7 @@ blinker==1.8.2 click==8.1.7 devservices==1.2.1 flake8==7.0.0 -confluent-kafka==2.1.1 +confluent-kafka==2.3.0 flask==3.0.3 msgpack==1.1.0 opentelemetry-proto==1.32.1