Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 62 additions & 8 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use relay_config::RelayMode;
use relay_event_schema::protocol::{EventId, EventType};
use relay_quotas::RateLimits;
use relay_statsd::metric;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
use crate::managed::ManagedEnvelope;
Expand Down Expand Up @@ -296,6 +297,25 @@ fn queue_envelope(
Ok(())
}

#[derive(Serialize)]
pub struct StoreResponse {
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<EventId>,
pub log_points: Option<Vec<LogPoint>>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct LogPoint {
pub id: usize,
pub message_template: String,
pub tags: HashMap<String, String>,
pub source_file: String,
pub line_number: usize,
pub sample_rate: Option<f64>,
pub sample_expression: Option<String>,
pub valid_until: Option<usize>,
}

/// Handles an envelope store request.
///
/// Sentry envelopes may come either directly from an HTTP request (the envelope endpoint calls this
Expand All @@ -304,10 +324,10 @@ fn queue_envelope(
///
/// This returns `Some(EventId)` if the envelope contains an event, either explicitly as payload or
/// implicitly through an item that will create an event during ingestion.
pub async fn handle_envelope(
pub async fn handle_envelope_with_log_points(
state: &ServiceState,
envelope: Box<Envelope>,
) -> Result<Option<EventId>, BadStoreRequest> {
) -> Result<StoreResponse, BadStoreRequest> {
emit_envelope_metrics(&envelope);

if state.memory_checker().check_memory().is_exceeded() {
Expand All @@ -324,7 +344,10 @@ pub async fn handle_envelope(
let event_id = managed_envelope.envelope().event_id();
if managed_envelope.envelope().is_empty() {
managed_envelope.reject(Outcome::Invalid(DiscardReason::EmptyEnvelope));
return Ok(event_id);
return Ok(StoreResponse {
id: event_id,
log_points: None,
});
}

let project_key = managed_envelope.envelope().meta().public_key();
Expand All @@ -338,9 +361,9 @@ pub async fn handle_envelope(
state.project_cache_handle().fetch(sampling_project_key);
}

let checked = state
.project_cache_handle()
.get(project_key)
let project = state.project_cache_handle().get(project_key);

let checked = project
.check_envelope(managed_envelope)
.await
.map_err(BadStoreRequest::EventRejected)?;
Expand All @@ -359,17 +382,48 @@ pub async fn handle_envelope(

queue_envelope(state, managed_envelope)?;

let log_points =
if let crate::services::projects::project::ProjectState::Enabled(project_info) =
project.state()
{
dbg!(project_info.last_change);
let log_points = project_info
.config
.filter_settings
.csp
.disallowed_sources
.iter()
.filter_map(|row| serde_json::from_str::<LogPoint>(row).ok())
.collect();
log_points
} else {
vec![]
};
dbg!(&log_points);

if checked.rate_limits.is_limited() {
// Even if some envelope items have been queued, there might be active rate limits on
// other items. Communicate these rate limits to the downstream (Relay or SDK).
//
// See `IntoResponse` implementation of `BadStoreRequest`.
Err(BadStoreRequest::RateLimited(checked.rate_limits))
} else {
Ok(event_id)
return Ok(StoreResponse {
id: event_id,
log_points: Some(log_points),
});
}
}

pub async fn handle_envelope(
state: &ServiceState,
envelope: Box<Envelope>,
) -> Result<Option<EventId>, BadStoreRequest> {
let res = handle_envelope_with_log_points(state, envelope).await?;

Ok(res.id)
}

fn emit_envelope_metrics(envelope: &Envelope) {
let client_name = envelope.meta().client_name().name();
for item in envelope.items() {
Expand Down
13 changes: 3 additions & 10 deletions relay-server/src/endpoints/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use axum::routing::{MethodRouter, post};
use axum::{Json, RequestExt};
use bytes::Bytes;
use relay_config::Config;
use relay_event_schema::protocol::EventId;
use serde::Serialize;

use crate::endpoints::common::{self, BadStoreRequest};
use crate::envelope::Envelope;
Expand Down Expand Up @@ -102,20 +100,15 @@ impl FromRequest<ServiceState> for EnvelopeParams {
}
}

#[derive(Serialize)]
struct StoreResponse {
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<EventId>,
}

/// Handler for the envelope store endpoint.
async fn handle(
state: ServiceState,
params: EnvelopeParams,
) -> Result<impl IntoResponse, BadStoreRequest> {
let envelope = params.extract_envelope()?;
let id = common::handle_envelope(&state, envelope).await?;
Ok(Json(StoreResponse { id }))
let response = common::handle_envelope_with_log_points(&state, envelope).await?;

Ok(Json(response))
}

pub fn route(config: &Config) -> MethodRouter<ServiceState> {
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ blinker==1.8.2
click==8.1.7
devservices==1.2.1
flake8==7.0.0
confluent-kafka==2.1.1
confluent-kafka==2.3.0
flask==3.0.3
msgpack==1.1.0
opentelemetry-proto==1.32.1
Expand Down
Loading