Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7e544dd
Removed by refactoring test
srilman Sep 10, 2025
fd2ae0b
Split to separate job step
srilman Sep 10, 2025
16b1459
Push to see
srilman Sep 10, 2025
9e81be8
Save again and check prev
srilman Sep 10, 2025
dcc7383
Working
srilman Sep 11, 2025
68da73a
Cleanup from greptile
srilman Sep 11, 2025
9d5e7b1
Should be formatted correctly
srilman Sep 12, 2025
f7658ca
Plan and result emission
srilman Sep 15, 2025
5d4474f
Added test
srilman Sep 15, 2025
f2912ad
Merge branch 'main' into slade/query-subscriber
srilman Sep 18, 2025
cc9146c
Fix bug in MicroPartitionSet
srilman Sep 18, 2025
fa7a28c
Fix CI issues
srilman Sep 18, 2025
7d0722e
Undo test changes
srilman Sep 18, 2025
959f8db
Hopefully fixes tests
srilman Sep 18, 2025
1b109a7
Update comment
srilman Sep 18, 2025
7e5dcae
Finally
srilman Sep 18, 2025
de918aa
Maybe fixes
srilman Sep 18, 2025
f25488c
One more time
srilman Sep 18, 2025
34ce3fd
Please now
srilman Sep 18, 2025
3f6e55f
Another cleanup round
srilman Sep 18, 2025
64b8a21
Refactored to handle the exec starting
srilman Sep 18, 2025
bd0993e
Apply later dashboard changes
srilman Sep 23, 2025
98edbe5
Something
srilman Sep 18, 2025
bb9277f
Its compiling
srilman Sep 19, 2025
b159de2
good to go
srilman Sep 22, 2025
50671a1
Ok ready
srilman Sep 22, 2025
a1e88a8
Fix merge issues
srilman Sep 23, 2025
56cd265
Fix issue in the result transfer
srilman Sep 23, 2025
d305db0
Fix issue in the result transfer
srilman Sep 23, 2025
87ec134
Merge branch 'main' into slade/dashboard-subscriber
srilman Oct 2, 2025
d2cc5a7
Double checked
srilman Oct 2, 2025
2fdc84a
Fix cargo machete
srilman Oct 2, 2025
6d757bb
Addressed greptile comments
srilman Oct 2, 2025
c467dd3
Addressed all comments
srilman Oct 3, 2025
4cafd0e
Fix machete
srilman Oct 3, 2025
27e628e
Address extra copy
srilman Oct 3, 2025
af294bf
reduce
colin-ho Oct 3, 2025
92840a8
Merge branch main into colin/reduce-stats-copying
colin-ho Oct 3, 2025
070da71
Update src/daft-context/src/python.rs
colin-ho Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 2 additions & 29 deletions src/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub type QueryPlan = Arc<str>;
/// Unique identifier for a node in the execution plan.
pub type NodeID = usize;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
pub enum Stat {
// Integer Representations
Count(u64),
Expand Down Expand Up @@ -100,36 +100,9 @@ macro_rules! snapshot {
///
/// This should match the format of the sendable snapshot, but is a different
/// type for deserialization
#[allow(dead_code)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct StatSnapshotRecv(Vec<(String, Stat)>);

#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct StatSnapshotView<'a>(SmallVec<[(&'a str, Stat); 3]>);

impl From<StatSnapshotSend> for StatSnapshotView<'static> {
fn from(snapshot: StatSnapshotSend) -> Self {
Self(snapshot.0)
}
}

impl<'a> IntoIterator for StatSnapshotView<'a> {
type Item = (&'a str, Stat);
type IntoIter = smallvec::IntoIter<[(&'a str, Stat); 3]>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

impl<'de, 'a> IntoIterator for &'de StatSnapshotView<'a> {
type Item = &'de (&'a str, Stat);
type IntoIter = std::slice::Iter<'de, (&'a str, Stat)>;

fn into_iter(self) -> Self::IntoIter {
self.0.iter()
}
}

#[cfg(feature = "python")]
pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {
use pyo3::types::PyModuleMethods;
Expand Down
10 changes: 5 additions & 5 deletions src/daft-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl DaftContext {
) -> DaftResult<()> {
self.with_state(|state| {
for subscriber in state.subscribers.values() {
subscriber.on_query_start(query_id.clone(), unoptimized_plan.clone())?;
subscriber.on_query_start(&query_id, unoptimized_plan.clone())?;
}
Ok::<(), DaftError>(())
})
Expand All @@ -135,7 +135,7 @@ impl DaftContext {
pub fn notify_query_end(&self, query_id: QueryID) -> DaftResult<()> {
self.with_state(move |state| {
for subscriber in state.subscribers.values() {
subscriber.on_query_end(query_id.clone())?;
subscriber.on_query_end(&query_id)?;
}
Ok::<(), DaftError>(())
})
Expand All @@ -148,7 +148,7 @@ impl DaftContext {
) -> DaftResult<()> {
self.with_state(|state| {
for subscriber in state.subscribers.values() {
subscriber.on_result_out(query_id.clone(), result.clone())?;
subscriber.on_result_out(&query_id, result.clone())?;
}
Ok::<(), DaftError>(())
})
Expand All @@ -157,7 +157,7 @@ impl DaftContext {
pub fn notify_optimization_start(&self, query_id: QueryID) -> DaftResult<()> {
self.with_state(|state| {
for subscriber in state.subscribers.values() {
subscriber.on_optimization_start(query_id.clone())?;
subscriber.on_optimization_start(&query_id)?;
}
Ok::<(), DaftError>(())
})
Expand All @@ -170,7 +170,7 @@ impl DaftContext {
) -> DaftResult<()> {
self.with_state(|state| {
for subscriber in state.subscribers.values() {
subscriber.on_optimization_end(query_id.clone(), optimized_plan.clone())?;
subscriber.on_optimization_end(&query_id, optimized_plan.clone())?;
}
Ok::<(), DaftError>(())
})
Expand Down
10 changes: 5 additions & 5 deletions src/daft-context/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ impl PyDaftContext {
pub fn notify_query_start(
&self,
py: Python,
query_id: String,
unoptimized_plan: String,
query_id: &str,
unoptimized_plan: &str,
) -> PyResult<()> {
py.allow_threads(|| {
self.inner
Expand All @@ -85,7 +85,7 @@ impl PyDaftContext {
pub fn notify_result_out(
&self,
py: Python,
query_id: String,
query_id: &str,
result: PyMicroPartition,
) -> PyResult<()> {
py.allow_threads(|| self.inner.notify_result_out(query_id.into(), result.into()))?;
Expand All @@ -100,8 +100,8 @@ impl PyDaftContext {
pub fn notify_optimization_end(
&self,
py: Python,
query_id: String,
optimized_plan: String,
query_id: &str,
optimized_plan: &str,
) -> PyResult<()> {
py.allow_threads(|| {
self.inner
Expand Down
45 changes: 16 additions & 29 deletions src/daft-context/src/subscribers/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::Arc, time::SystemTime};

use async_trait::async_trait;
use common_error::{DaftError, DaftResult};
use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotView, ops::NodeInfo};
use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotSend, ops::NodeInfo};
use common_runtime::{RuntimeRef, get_io_runtime};
use daft_io::IOStatsContext;
use daft_micropartition::{MicroPartition, MicroPartitionRef};
Expand Down Expand Up @@ -90,7 +90,7 @@ const TOTAL_ROWS: usize = 10;

#[async_trait]
impl Subscriber for DashboardSubscriber {
fn on_query_start(&self, query_id: QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> {
fn on_query_start(&self, query_id: &QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> {
self.runtime.block_on_current_thread(async {
Self::handle_request(
self.client
Expand All @@ -105,13 +105,13 @@ impl Subscriber for DashboardSubscriber {
})
}

fn on_result_out(&self, query_id: QueryID, result: MicroPartitionRef) -> DaftResult<()> {
fn on_result_out(&self, query_id: &QueryID, result: MicroPartitionRef) -> DaftResult<()> {
// Limit to TOTAL_ROWS rows
// TODO: Limit by X MB and # of rows
let entry = self.preview_rows.get_mut(&query_id);
let entry = self.preview_rows.get_mut(query_id);
if entry.is_none() {
let result = result.head(TOTAL_ROWS)?;
self.preview_rows.insert(query_id, Arc::new(result));
self.preview_rows.insert(query_id.clone(), Arc::new(result));
return Ok(());
}

Expand All @@ -128,10 +128,10 @@ impl Subscriber for DashboardSubscriber {
Ok(())
}

fn on_query_end(&self, query_id: QueryID) -> DaftResult<()> {
fn on_query_end(&self, query_id: &QueryID) -> DaftResult<()> {
let result = self
.preview_rows
.view(&query_id, |_, v| v.clone())
.view(query_id, |_, v| v.clone())
.unwrap_or_else(|| Arc::new(MicroPartition::empty(None)));

debug_assert!(result.len() <= TOTAL_ROWS);
Expand Down Expand Up @@ -159,7 +159,7 @@ impl Subscriber for DashboardSubscriber {
})
}

fn on_optimization_start(&self, query_id: QueryID) -> DaftResult<()> {
fn on_optimization_start(&self, query_id: &QueryID) -> DaftResult<()> {
self.runtime.block_on_current_thread(async {
Self::handle_request(
self.client
Expand All @@ -173,7 +173,7 @@ impl Subscriber for DashboardSubscriber {
})
}

fn on_optimization_end(&self, query_id: QueryID, optimized_plan: QueryPlan) -> DaftResult<()> {
fn on_optimization_end(&self, query_id: &QueryID, optimized_plan: QueryPlan) -> DaftResult<()> {
let plan_end_sec = secs_from_epoch();
self.runtime.block_on_current_thread(async {
Self::handle_request(
Expand All @@ -189,7 +189,7 @@ impl Subscriber for DashboardSubscriber {
})
}

fn on_exec_start(&self, query_id: QueryID, node_infos: &[Arc<NodeInfo>]) -> DaftResult<()> {
fn on_exec_start(&self, query_id: &QueryID, node_infos: &[Arc<NodeInfo>]) -> DaftResult<()> {
let exec_start_sec = secs_from_epoch();
self.runtime.block_on_current_thread(async {
Self::handle_request(
Expand All @@ -208,7 +208,7 @@ impl Subscriber for DashboardSubscriber {
})
}

async fn on_exec_operator_start(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> {
async fn on_exec_operator_start(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> {
Self::handle_request(self.client.post(format!(
"{}/engine/query/{}/exec/{}/start",
self.url, query_id, node_id
Expand All @@ -219,35 +219,22 @@ impl Subscriber for DashboardSubscriber {

async fn on_exec_emit_stats(
&self,
query_id: QueryID,
stats: &[(NodeID, StatSnapshotView)],
query_id: &QueryID,
stats: &[(NodeID, StatSnapshotSend)],
) -> DaftResult<()> {
Self::handle_request(
self.client
.post(format!(
"{}/engine/query/{}/exec/emit_stats",
self.url, query_id
))
.json(&daft_dashboard::engine::ExecEmitStatsArgsSend {
stats: stats
.iter()
.map(|(node_id, stats)| {
(
*node_id,
stats
.into_iter()
.map(|(name, stat)| (*name, stat.clone()))
.collect(),
)
})
.collect::<Vec<_>>(),
}),
.json(stats),
)
.await?;
Ok(())
}

async fn on_exec_operator_end(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> {
async fn on_exec_operator_end(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> {
Self::handle_request(self.client.post(format!(
"{}/engine/query/{}/exec/{}/end",
self.url, query_id, node_id
Expand All @@ -256,7 +243,7 @@ impl Subscriber for DashboardSubscriber {
Ok(())
}

async fn on_exec_end(&self, query_id: QueryID) -> DaftResult<()> {
async fn on_exec_end(&self, query_id: &QueryID) -> DaftResult<()> {
let exec_end_sec = secs_from_epoch();

Self::handle_request(
Expand Down
30 changes: 15 additions & 15 deletions src/daft-context/src/subscribers/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use common_error::DaftResult;
use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotView, ops::NodeInfo};
use common_metrics::{NodeID, QueryID, QueryPlan, StatSnapshotSend, ops::NodeInfo};
use daft_micropartition::MicroPartitionRef;
use dashmap::DashMap;

Expand All @@ -23,58 +23,58 @@ impl DebugSubscriber {

#[async_trait]
impl Subscriber for DebugSubscriber {
fn on_query_start(&self, query_id: QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> {
fn on_query_start(&self, query_id: &QueryID, unoptimized_plan: QueryPlan) -> DaftResult<()> {
eprintln!(
"Started query `{}` with unoptimized plan:\n{}",
query_id, unoptimized_plan
);
self.rows_out.insert(query_id, 0);
self.rows_out.insert(query_id.clone(), 0);
Ok(())
}

fn on_query_end(&self, query_id: QueryID) -> DaftResult<()> {
fn on_query_end(&self, query_id: &QueryID) -> DaftResult<()> {
eprintln!(
"Ended query `{}` with result of {} rows",
query_id,
self.rows_out
.get(&query_id)
.get(query_id)
.expect("Query not found")
.value()
);
Ok(())
}

fn on_result_out(&self, query_id: QueryID, result: MicroPartitionRef) -> DaftResult<()> {
fn on_result_out(&self, query_id: &QueryID, result: MicroPartitionRef) -> DaftResult<()> {
*self
.rows_out
.get_mut(&query_id)
.get_mut(query_id)
.expect("Query not found")
.value_mut() += result.len();
Ok(())
}

fn on_optimization_start(&self, query_id: QueryID) -> DaftResult<()> {
fn on_optimization_start(&self, query_id: &QueryID) -> DaftResult<()> {
eprintln!("Started planning query `{}`", query_id);
Ok(())
}

fn on_optimization_end(&self, query_id: QueryID, optimized_plan: QueryPlan) -> DaftResult<()> {
fn on_optimization_end(&self, query_id: &QueryID, optimized_plan: QueryPlan) -> DaftResult<()> {
eprintln!(
"Finished planning query `{}` with optimized plan:\n{}",
query_id, optimized_plan
);
Ok(())
}

fn on_exec_start(&self, query_id: QueryID, node_infos: &[Arc<NodeInfo>]) -> DaftResult<()> {
fn on_exec_start(&self, query_id: &QueryID, node_infos: &[Arc<NodeInfo>]) -> DaftResult<()> {
eprintln!("Started executing query `{}`", query_id);
for node_info in node_infos {
eprintln!(" - Node {}: {}", node_info.id, node_info.name);
}
Ok(())
}

async fn on_exec_operator_start(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> {
async fn on_exec_operator_start(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> {
eprintln!(
"Started executing operator `{}` in query `{}`",
node_id, query_id
Expand All @@ -84,8 +84,8 @@ impl Subscriber for DebugSubscriber {

async fn on_exec_emit_stats(
&self,
query_id: QueryID,
stats: &[(NodeID, StatSnapshotView)],
query_id: &QueryID,
stats: &[(NodeID, StatSnapshotSend)],
) -> DaftResult<()> {
eprintln!("Emitting execution stats for query `{}`", query_id);
for node_id in stats {
Expand All @@ -97,15 +97,15 @@ impl Subscriber for DebugSubscriber {
Ok(())
}

async fn on_exec_operator_end(&self, query_id: QueryID, node_id: NodeID) -> DaftResult<()> {
async fn on_exec_operator_end(&self, query_id: &QueryID, node_id: NodeID) -> DaftResult<()> {
eprintln!(
"Finished executing operator `{}` in query `{}`",
node_id, query_id
);
Ok(())
}

async fn on_exec_end(&self, query_id: QueryID) -> DaftResult<()> {
async fn on_exec_end(&self, query_id: &QueryID) -> DaftResult<()> {
eprintln!("Finished executing query `{}`", query_id);
Ok(())
}
Expand Down
Loading