diff --git a/src/common/metrics/src/lib.rs b/src/common/metrics/src/lib.rs index 7a19763551..8dd26b9a40 100644 --- a/src/common/metrics/src/lib.rs +++ b/src/common/metrics/src/lib.rs @@ -22,7 +22,7 @@ pub type QueryPlan = Arc; /// Unique identifier for a node in the execution plan. pub type NodeID = usize; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] pub enum Stat { // Integer Representations Count(u64), @@ -100,36 +100,9 @@ macro_rules! snapshot { /// /// This should match the format of the sendable snapshot, but is a different /// type for deserialization -#[allow(dead_code)] -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] pub struct StatSnapshotRecv(Vec<(String, Stat)>); -#[derive(Debug, Clone, PartialEq, Serialize)] -pub struct StatSnapshotView<'a>(SmallVec<[(&'a str, Stat); 3]>); - -impl From for StatSnapshotView<'static> { - fn from(snapshot: StatSnapshotSend) -> Self { - Self(snapshot.0) - } -} - -impl<'a> IntoIterator for StatSnapshotView<'a> { - type Item = (&'a str, Stat); - type IntoIter = smallvec::IntoIter<[(&'a str, Stat); 3]>; - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl<'de, 'a> IntoIterator for &'de StatSnapshotView<'a> { - type Item = &'de (&'a str, Stat); - type IntoIter = std::slice::Iter<'de, (&'a str, Stat)>; - - fn into_iter(self) -> Self::IntoIter { - self.0.iter() - } -} - #[cfg(feature = "python")] pub fn register_modules(parent: &Bound) -> PyResult<()> { use pyo3::types::PyModuleMethods; diff --git a/src/daft-context/src/lib.rs b/src/daft-context/src/lib.rs index 22b98ba1a3..37b6e31205 100644 --- a/src/daft-context/src/lib.rs +++ b/src/daft-context/src/lib.rs @@ -126,7 +126,7 @@ impl DaftContext { ) -> DaftResult<()> { self.with_state(|state| { for subscriber in state.subscribers.values() { - subscriber.on_query_start(query_id.clone(), unoptimized_plan.clone())?; + subscriber.on_query_start(&query_id, unoptimized_plan.clone())?; } Ok::<(), DaftError>(()) }) @@ -135,7 +135,7 @@ impl DaftContext { pub fn notify_query_end(&self, query_id: QueryID) -> DaftResult<()> { self.with_state(move |state| { for subscriber in state.subscribers.values() { - subscriber.on_query_end(query_id.clone())?; + subscriber.on_query_end(&query_id)?; } Ok::<(), DaftError>(()) }) @@ -148,7 +148,7 @@ impl DaftContext { ) -> DaftResult<()> { self.with_state(|state| { for subscriber in state.subscribers.values() { - subscriber.on_result_out(query_id.clone(), result.clone())?; + subscriber.on_result_out(&query_id, result.clone())?; } Ok::<(), DaftError>(()) }) @@ -157,7 +157,7 @@ impl DaftContext { pub fn notify_optimization_start(&self, query_id: QueryID) -> DaftResult<()> { self.with_state(|state| { for subscriber in state.subscribers.values() { - subscriber.on_optimization_start(query_id.clone())?; + subscriber.on_optimization_start(&query_id)?; } Ok::<(), DaftError>(()) }) @@ -170,7 +170,7 @@ impl DaftContext { ) -> DaftResult<()> { self.with_state(|state| { for subscriber in state.subscribers.values() { - subscriber.on_optimization_end(query_id.clone(), optimized_plan.clone())?; + subscriber.on_optimization_end(&query_id, optimized_plan.clone())?; } Ok::<(), DaftError>(()) }) diff --git a/src/daft-context/src/python.rs b/src/daft-context/src/python.rs index 04b3ca25ca..cf52b6a6fb 100644 --- a/src/daft-context/src/python.rs +++ b/src/daft-context/src/python.rs @@ -67,8 +67,8 @@ impl PyDaftContext { pub fn notify_query_start( &self, py: Python, - query_id: String, - unoptimized_plan: String, + query_id: &str, + unoptimized_plan: &str, ) -> PyResult<()> { py.allow_threads(|| { self.inner @@ -85,7 +85,7 @@ impl PyDaftContext { pub fn notify_result_out( &self, py: Python, - query_id: String, + query_id: &str, result: PyMicroPartition, ) -> PyResult<()> { py.allow_threads(|| self.inner.notify_result_out(query_id.into(), result.into()))?; @@ -100,8 +100,8 @@ impl PyDaftContext { pub fn notify_optimization_end( &self, py: Python, - query_id: String, - optimized_plan: String, + query_id: &str, + optimized_plan: &str, ) -> PyResult<()> { py.allow_threads(|| { self.inner diff --git a/src/daft-context/src/subscribers/dashboard.rs b/src/daft-context/src/subscribers/dashboard.rs index 34604ac95c..cfacd090f6 100644 --- a/src/daft-context/src/subscribers/dashboard.rs +++ b/src/daft-context/src/subscribers/dashboard.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time::SystemTime}; use async_trait::async_trait; use common_error::{DaftError, DaftResult}; -use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotView, ops::NodeInfo}; +use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotSend, ops::NodeInfo}; use common_runtime::{RuntimeRef, get_io_runtime}; use daft_io::IOStatsContext; use daft_micropartition::{MicroPartition, MicroPartitionRef}; @@ -90,7 +90,7 @@ const TOTAL_ROWS: usize = 10; #[async_trait] impl Subscriber for DashboardSubscriber { - fn on_query_start(&self, query_id: QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> { + fn on_query_start(&self, query_id: &QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> { self.runtime.block_on_current_thread(async { Self::handle_request( self.client @@ -105,13 +105,13 @@ impl Subscriber for DashboardSubscriber { }) } - fn on_result_out(&self, query_id: QueryID, result: MicroPartitionRef) -> DaftResult<()> { + fn on_result_out(&self, query_id: &QueryID, result: MicroPartitionRef) -> DaftResult<()> { // Limit to TOTAL_ROWS rows // TODO: Limit by X MB and # of rows - let entry = self.preview_rows.get_mut(&query_id); + let entry = self.preview_rows.get_mut(query_id); if entry.is_none() { let result = result.head(TOTAL_ROWS)?; - self.preview_rows.insert(query_id, Arc::new(result)); + self.preview_rows.insert(query_id.clone(), Arc::new(result)); return Ok(()); } @@ -128,10 +128,10 @@ impl Subscriber for DashboardSubscriber { Ok(()) } - fn on_query_end(&self, query_id: QueryID) -> DaftResult<()> { + fn on_query_end(&self, query_id: &QueryID) -> DaftResult<()> { let result = self .preview_rows - .view(&query_id, |_, v| v.clone()) + .view(query_id, |_, v| v.clone()) .unwrap_or_else(|| Arc::new(MicroPartition::empty(None))); debug_assert!(result.len() <= TOTAL_ROWS); @@ -159,7 +159,7 @@ impl Subscriber for DashboardSubscriber { }) } - fn on_optimization_start(&self, query_id: QueryID) -> DaftResult<()> { + fn on_optimization_start(&self, query_id: &QueryID) -> DaftResult<()> { self.runtime.block_on_current_thread(async { Self::handle_request( self.client @@ -173,7 +173,7 @@ impl Subscriber for DashboardSubscriber { }) } - fn on_optimization_end(&self, query_id: QueryID, optimized_plan: QueryPlan) -> DaftResult<()> { + fn on_optimization_end(&self, query_id: &QueryID, optimized_plan: QueryPlan) -> DaftResult<()> { let plan_end_sec = secs_from_epoch(); self.runtime.block_on_current_thread(async { Self::handle_request( @@ -189,7 +189,7 @@ impl Subscriber for DashboardSubscriber { }) } - fn on_exec_start(&self, query_id: QueryID, node_infos: &[Arc]) -> DaftResult<()> { + fn on_exec_start(&self, query_id: &QueryID, node_infos: &[Arc]) -> DaftResult<()> { let exec_start_sec = secs_from_epoch(); self.runtime.block_on_current_thread(async { Self::handle_request( @@ -208,7 +208,7 @@ impl Subscriber for DashboardSubscriber { }) } - async fn on_exec_operator_start(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> { + async fn on_exec_operator_start(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> { Self::handle_request(self.client.post(format!( "{}/engine/query/{}/exec/{}/start", self.url, query_id, node_id @@ -219,8 +219,8 @@ impl Subscriber for DashboardSubscriber { async fn on_exec_emit_stats( &self, - query_id: QueryID, - stats: &[(NodeID, StatSnapshotView)], + query_id: &QueryID, + stats: &[(NodeID, StatSnapshotSend)], ) -> DaftResult<()> { Self::handle_request( self.client @@ -228,26 +228,13 @@ impl Subscriber for DashboardSubscriber { "{}/engine/query/{}/exec/emit_stats", self.url, query_id )) - .json(&daft_dashboard::engine::ExecEmitStatsArgsSend { - stats: stats - .iter() - .map(|(node_id, stats)| { - ( - *node_id, - stats - .into_iter() - .map(|(name, stat)| (*name, stat.clone())) - .collect(), - ) - }) - .collect::>(), - }), + .json(stats), ) .await?; Ok(()) } - async fn on_exec_operator_end(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> { + async fn on_exec_operator_end(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> { Self::handle_request(self.client.post(format!( "{}/engine/query/{}/exec/{}/end", self.url, query_id, node_id @@ -256,7 +243,7 @@ impl Subscriber for DashboardSubscriber { Ok(()) } - async fn on_exec_end(&self, query_id: QueryID) -> DaftResult<()> { + async fn on_exec_end(&self, query_id: &QueryID) -> DaftResult<()> { let exec_end_sec = secs_from_epoch(); Self::handle_request( diff --git a/src/daft-context/src/subscribers/debug.rs b/src/daft-context/src/subscribers/debug.rs index 25c747a478..a63f038d79 100644 --- a/src/daft-context/src/subscribers/debug.rs +++ b/src/daft-context/src/subscribers/debug.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::DaftResult; -use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotView, ops::NodeInfo}; +use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotSend, ops::NodeInfo}; use daft_micropartition::MicroPartitionRef; use dashmap::DashMap; @@ -23,42 +23,42 @@ impl DebugSubscriber { #[async_trait] impl Subscriber for DebugSubscriber { - fn on_query_start(&self, query_id: QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> { + fn on_query_start(&self, query_id: &QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> { eprintln!( "Started query `{}` with unoptimized plan:\n{}", query_id, unoptimized_plan ); - self.rows_out.insert(query_id, 0); + self.rows_out.insert(query_id.clone(), 0); Ok(()) } - fn on_query_end(&self, query_id: QueryID) -> DaftResult<()> { + fn on_query_end(&self, query_id: &QueryID) -> DaftResult<()> { eprintln!( "Ended query `{}` with result of {} rows", query_id, self.rows_out - .get(&query_id) + .get(query_id) .expect("Query not found") .value() ); Ok(()) } - fn on_result_out(&self, query_id: QueryID, result: MicroPartitionRef) -> DaftResult<()> { + fn on_result_out(&self, query_id: &QueryID, result: MicroPartitionRef) -> DaftResult<()> { *self .rows_out - .get_mut(&query_id) + .get_mut(query_id) .expect("Query not found") .value_mut() += result.len(); Ok(()) } - fn on_optimization_start(&self, query_id: QueryID) -> DaftResult<()> { + fn on_optimization_start(&self, query_id: &QueryID) -> DaftResult<()> { eprintln!("Started planning query `{}`", query_id); Ok(()) } - fn on_optimization_end(&self, query_id: QueryID, optimized_plan: QueryPlan) -> DaftResult<()> { + fn on_optimization_end(&self, query_id: &QueryID, optimized_plan: QueryPlan) -> DaftResult<()> { eprintln!( "Finished planning query `{}` with optimized plan:\n{}", query_id, optimized_plan @@ -66,7 +66,7 @@ impl Subscriber for DebugSubscriber { Ok(()) } - fn on_exec_start(&self, query_id: QueryID, node_infos: &[Arc]) -> DaftResult<()> { + fn on_exec_start(&self, query_id: &QueryID, node_infos: &[Arc]) -> DaftResult<()> { eprintln!("Started executing query `{}`", query_id); for node_info in node_infos { eprintln!(" - Node {}: {}", node_info.id, node_info.name); @@ -74,7 +74,7 @@ impl Subscriber for DebugSubscriber { Ok(()) } - async fn on_exec_operator_start(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> { + async fn on_exec_operator_start(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> { eprintln!( "Started executing operator `{}` in query `{}`", node_id, query_id @@ -84,8 +84,8 @@ impl Subscriber for DebugSubscriber { async fn on_exec_emit_stats( &self, - query_id: QueryID, - stats: &[(NodeID, StatSnapshotView)], + query_id: &QueryID, + stats: &[(NodeID, StatSnapshotSend)], ) -> DaftResult<()> { eprintln!("Emitting execution stats for query `{}`", query_id); for node_id in stats { @@ -97,7 +97,7 @@ impl Subscriber for DebugSubscriber { Ok(()) } - async fn on_exec_operator_end(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> { + async fn on_exec_operator_end(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> { eprintln!( "Finished executing operator `{}` in query `{}`", node_id, query_id @@ -105,7 +105,7 @@ impl Subscriber for DebugSubscriber { Ok(()) } - async fn on_exec_end(&self, query_id: QueryID) -> DaftResult<()> { + async fn on_exec_end(&self, query_id: &QueryID) -> DaftResult<()> { eprintln!("Finished executing query `{}`", query_id); Ok(()) } diff --git a/src/daft-context/src/subscribers/mod.rs b/src/daft-context/src/subscribers/mod.rs index 6c4cd974da..251a99e0eb 100644 --- a/src/daft-context/src/subscribers/mod.rs +++ b/src/daft-context/src/subscribers/mod.rs @@ -7,25 +7,25 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use common_error::DaftResult; -use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotView, ops::NodeInfo}; +use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotSend, ops::NodeInfo}; use daft_micropartition::MicroPartitionRef; #[async_trait] pub trait Subscriber: Send + Sync + std::fmt::Debug + 'static { - fn on_query_start(&self, query_id: QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()>; - fn on_query_end(&self, query_id: QueryID) -> DaftResult<()>; - fn on_result_out(&self, query_id: QueryID, result: MicroPartitionRef) -> DaftResult<()>; - fn on_optimization_start(&self, query_id: QueryID) -> DaftResult<()>; - fn on_optimization_end(&self, query_id: QueryID, optimized_plan: QueryPlan) -> DaftResult<()>; - fn on_exec_start(&self, query_id: QueryID, node_infos: &[Arc]) -> DaftResult<()>; - async fn on_exec_operator_start(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()>; + fn on_query_start(&self, query_id: &QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()>; + fn on_query_end(&self, query_id: &QueryID) -> DaftResult<()>; + fn on_result_out(&self, query_id: &QueryID, result: MicroPartitionRef) -> DaftResult<()>; + fn on_optimization_start(&self, query_id: &QueryID) -> DaftResult<()>; + fn on_optimization_end(&self, query_id: &QueryID, optimized_plan: QueryPlan) -> DaftResult<()>; + fn on_exec_start(&self, query_id: &QueryID, node_infos: &[Arc]) -> DaftResult<()>; + async fn on_exec_operator_start(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()>; async fn on_exec_emit_stats( &self, - query_id: QueryID, - stats: &[(NodeID, StatSnapshotView)], + query_id: &QueryID, + stats: &[(NodeID, StatSnapshotSend)], ) -> DaftResult<()>; - async fn on_exec_operator_end(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()>; - async fn on_exec_end(&self, query_id: QueryID) -> DaftResult<()>; + async fn on_exec_operator_end(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()>; + async fn on_exec_end(&self, query_id: &QueryID) -> DaftResult<()>; } pub fn default_subscribers() -> HashMap> { diff --git a/src/daft-context/src/subscribers/python.rs b/src/daft-context/src/subscribers/python.rs index ba75580912..ff1acd8a28 100644 --- a/src/daft-context/src/subscribers/python.rs +++ b/src/daft-context/src/subscribers/python.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use common_error::DaftResult; -use common_metrics::{QueryID, QueryPlan, StatSnapshotView, ops::NodeInfo, python::PyNodeInfo}; +use common_metrics::{QueryID, QueryPlan, StatSnapshotSend, ops::NodeInfo, python::PyNodeInfo}; use daft_micropartition::{MicroPartitionRef, python::PyMicroPartition}; use pyo3::{IntoPyObject, PyObject, Python, intern}; @@ -14,7 +14,7 @@ pub struct PySubscriberWrapper(pub(crate) PyObject); #[async_trait] impl Subscriber for PySubscriberWrapper { - fn on_query_start(&self, query_id: QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> { + fn on_query_start(&self, query_id: &QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> { Python::with_gil(|py| { self.0.call_method1( py, @@ -25,7 +25,7 @@ impl Subscriber for PySubscriberWrapper { }) } - fn on_query_end(&self, query_id: QueryID) -> DaftResult<()> { + fn on_query_end(&self, query_id: &QueryID) -> DaftResult<()> { Python::with_gil(|py| { self.0 .call_method1(py, intern!(py, "on_query_end"), (query_id.to_string(),))?; @@ -33,7 +33,7 @@ impl Subscriber for PySubscriberWrapper { }) } - fn on_result_out(&self, query_id: QueryID, result: MicroPartitionRef) -> DaftResult<()> { + fn on_result_out(&self, query_id: &QueryID, result: MicroPartitionRef) -> DaftResult<()> { Python::with_gil(|py| { self.0.call_method1( py, @@ -44,7 +44,7 @@ impl Subscriber for PySubscriberWrapper { }) } - fn on_optimization_start(&self, query_id: QueryID) -> DaftResult<()> { + fn on_optimization_start(&self, query_id: &QueryID) -> DaftResult<()> { Python::with_gil(|py| { self.0.call_method1( py, @@ -55,7 +55,7 @@ impl Subscriber for PySubscriberWrapper { }) } - fn on_optimization_end(&self, query_id: QueryID, optimized_plan: QueryPlan) -> DaftResult<()> { + fn on_optimization_end(&self, query_id: &QueryID, optimized_plan: QueryPlan) -> DaftResult<()> { Python::with_gil(|py| { self.0.call_method1( py, @@ -66,7 +66,7 @@ impl Subscriber for PySubscriberWrapper { }) } - fn on_exec_start(&self, query_id: QueryID, node_infos: &[Arc]) -> DaftResult<()> { + fn on_exec_start(&self, query_id: &QueryID, node_infos: &[Arc]) -> DaftResult<()> { Python::with_gil(|py| { let py_node_infos = node_infos .iter() @@ -81,7 +81,7 @@ impl Subscriber for PySubscriberWrapper { }) } - async fn on_exec_operator_start(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> { + async fn on_exec_operator_start(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> { Python::with_gil(|py| { self.0.call_method1( py, @@ -94,16 +94,16 @@ impl Subscriber for PySubscriberWrapper { async fn on_exec_emit_stats( &self, - query_id: QueryID, - stats: &[(NodeID, StatSnapshotView)], + query_id: &QueryID, + stats: &[(NodeID, StatSnapshotSend)], ) -> DaftResult<()> { Python::with_gil(|py| { let stats_map = stats .iter() .map(|(node_id, stats)| { let stat_map = stats - .into_iter() - .map(|(name, stat)| (*name, stat.clone().into_py_contents(py).unwrap())) + .iter() + .map(|(name, stat)| (name, stat.into_py_contents(py).unwrap())) .collect::>(); (node_id, stat_map) @@ -120,7 +120,7 @@ impl Subscriber for PySubscriberWrapper { }) } - async fn on_exec_operator_end(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> { + async fn on_exec_operator_end(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> { Python::with_gil(|py| { self.0.call_method1( py, @@ -131,7 +131,7 @@ impl Subscriber for PySubscriberWrapper { }) } - async fn on_exec_end(&self, query_id: QueryID) -> DaftResult<()> { + async fn on_exec_end(&self, query_id: &QueryID) -> DaftResult<()> { Python::with_gil(|py| { self.0 .call_method1(py, intern!(py, "on_exec_end"), (query_id.to_string(),))?; diff --git a/src/daft-dashboard/src/engine.rs b/src/daft-dashboard/src/engine.rs index 77657b9883..40a8b1d406 100644 --- a/src/daft-dashboard/src/engine.rs +++ b/src/daft-dashboard/src/engine.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use axum::{ Json, Router, @@ -6,7 +6,7 @@ use axum::{ http::StatusCode, routing::post, }; -use common_metrics::{QueryID, QueryPlan, Stat, ops::NodeInfo}; +use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotRecv, ops::NodeInfo}; use daft_recordbatch::RecordBatch; use serde::{Deserialize, Serialize}; @@ -106,7 +106,7 @@ async fn exec_start( OperatorInfo { status: OperatorStatus::Pending, node_info, - stats: HashMap::new(), + stats: StatSnapshotRecv::default(), }, ) }) @@ -142,27 +142,17 @@ async fn exec_op_end( StatusCode::OK } -#[derive(Debug, Clone, Serialize)] -pub struct ExecEmitStatsArgsSend<'a> { - pub stats: Vec<(usize, HashMap<&'a str, Stat>)>, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct ExecEmitStatsArgsRecv { - pub stats: Vec<(usize, HashMap)>, -} - async fn exec_emit_stats( State(state): State>, Path(query_id): Path, - Json(args): Json, + Json(args): Json>, ) -> StatusCode { let mut query_info = state.queries.get_mut(&query_id).unwrap(); let QueryState::Executing { exec_info, .. } = &mut query_info.status else { return StatusCode::BAD_REQUEST; }; - for (operator_id, stats) in args.stats { + for (operator_id, stats) in args { exec_info.operators.get_mut(&operator_id).unwrap().stats = stats; } StatusCode::OK diff --git a/src/daft-dashboard/src/state.rs b/src/daft-dashboard/src/state.rs index e07b336c22..16e1af21b1 100644 --- a/src/daft-dashboard/src/state.rs +++ b/src/daft-dashboard/src/state.rs @@ -3,7 +3,7 @@ use std::{ sync::{Arc, LazyLock}, }; -use common_metrics::{NodeID, QueryID, QueryPlan, Stat, ops::NodeInfo}; +use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotRecv, ops::NodeInfo}; use daft_recordbatch::RecordBatch; use dashmap::DashMap; use serde::Serialize; @@ -20,7 +20,7 @@ pub(crate) enum OperatorStatus { pub(crate) struct OperatorInfo { pub status: OperatorStatus, pub node_info: NodeInfo, - pub stats: HashMap, + pub stats: StatSnapshotRecv, } #[derive(Debug, Clone, PartialEq, Eq, Serialize)] diff --git a/src/daft-local-execution/src/runtime_stats/subscribers/query.rs b/src/daft-local-execution/src/runtime_stats/subscribers/query.rs index c5e066f673..12cc6da8e0 100644 --- a/src/daft-local-execution/src/runtime_stats/subscribers/query.rs +++ b/src/daft-local-execution/src/runtime_stats/subscribers/query.rs @@ -16,7 +16,7 @@ pub(crate) struct SubscriberWrapper { impl SubscriberWrapper { pub fn try_new(inner: Arc, node_infos: &[Arc]) -> DaftResult { let query_id: QueryID = node_infos[0].context["query_id"].clone().into(); - inner.on_exec_start(query_id.clone(), node_infos)?; + inner.on_exec_start(&query_id, node_infos)?; Ok(Self { inner, query_id }) } } @@ -25,14 +25,14 @@ impl SubscriberWrapper { impl RuntimeStatsSubscriber for SubscriberWrapper { async fn initialize_node(&self, node_id: NodeID) -> DaftResult<()> { self.inner - .on_exec_operator_start(self.query_id.clone(), node_id) + .on_exec_operator_start(&self.query_id, node_id) .await?; Ok(()) } async fn finalize_node(&self, node_id: NodeID) -> DaftResult<()> { self.inner - .on_exec_operator_end(self.query_id.clone(), node_id) + .on_exec_operator_end(&self.query_id, node_id) .await?; Ok(()) } @@ -41,19 +41,14 @@ impl RuntimeStatsSubscriber for SubscriberWrapper { &self, events: &[(NodeID, common_metrics::StatSnapshotSend)], ) -> DaftResult<()> { - let all_node_stats = events - .iter() - .map(|(node_id, snapshot)| (*node_id, snapshot.clone().into())) - .collect::>(); - self.inner - .on_exec_emit_stats(self.query_id.clone(), all_node_stats.as_slice()) + .on_exec_emit_stats(&self.query_id, events) .await?; Ok(()) } async fn finish(self: Box) -> DaftResult<()> { - self.inner.on_exec_end(self.query_id.clone()).await?; + self.inner.on_exec_end(&self.query_id).await?; Ok(()) }