Skip to content

Conversation

srilman
Copy link
Contributor

@srilman srilman commented Sep 23, 2025

Changes Made

Implement the dashboard server and communication from the engine via the Subscriber framework. This is just a basic template to make sure the backend is working. The client-side stuff (aka the frontend) will mostly be implemented in the next PR, but I add some skeleton API endpoints just to test that everything is working.

I double checked and didn't notice any consistent performance hit while connected to the dashboard, so I think its fine for now. To test, I started a manual server via the CLI, ran a query, and then called the client endpoints using Postman.

Follow ups:

  • Use a websocket for engine -> server communication for lower latency
  • Add an integration test section for the dashboard. Need to fix the API before I do that though

@srilman srilman changed the title [wip] feat: Dashboard Query Subscriber feat: Dashboard Query Subscriber Sep 23, 2025
Base automatically changed from slade/query-subscriber to main October 1, 2025 23:41
@github-actions github-actions bot added the feat label Oct 1, 2025
@srilman srilman marked this pull request as ready for review October 2, 2025 02:14
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Summary

This PR implements a comprehensive dashboard monitoring system for Daft by introducing a Subscriber-based architecture that tracks query execution lifecycle in real-time. The implementation includes both server-side infrastructure and client-facing API endpoints to enable monitoring of query planning, execution, and completion phases.

The core changes involve:

  1. Subscriber Framework Integration: A new DashboardSubscriber is added to the default subscribers in daft-context, which automatically sends query lifecycle events (start, planning, execution, finalization) to a dashboard server via HTTP POST requests.

  2. Dashboard Server Architecture: The daft-dashboard crate is significantly expanded with modular components:

    • state.rs: Thread-safe state management using DashMap to track multiple concurrent queries
    • engine.rs: REST API endpoints that receive query lifecycle updates from the engine
    • client.rs: API endpoints for dashboard UI to retrieve query summaries and details
  3. State Management Refactoring: The dashboard moves from simple Mutex-based global state to LazyLock with DashMap for concurrent access patterns, enabling multiple queries to be tracked simultaneously.

  4. Query Lifecycle Tracking: The system now captures comprehensive query information including status transitions (pending → planning → executing → finalizing → finished), operator-level statistics, execution timing, and sampled result data (first 10 rows) for preview purposes.

  5. API Cleanup: The Python interface is simplified by removing deprecated functions like get_dashboard_url() and broadcast_query_information(), consolidating to a clean launch() function that handles server startup and shutdown registration.

  6. Dependency Management: HTTP client functionality is consolidated using workspace-level reqwest dependency configuration, and dashboard-specific dependencies (dashmap, serde_json, common-metrics) are added to support the new architecture.

The implementation follows a clean separation of concerns where the execution engine publishes events through the subscriber pattern, while the dashboard server maintains query state and serves both engine updates and client queries through distinct API endpoints. The architecture is designed to be optional (enabled via environment variables) and extensible for future websocket-based communication.

Important Files Changed

Changed Files
Filename Score Overview
src/daft-context/src/subscribers/dashboard.rs 2/5 New DashboardSubscriber implementation with extensive use of unwrap() calls that could cause panics
src/daft-dashboard/src/engine.rs 2/5 REST API endpoints for query lifecycle tracking with multiple unwrap() calls and no proper error handling
src/daft-dashboard/src/client.rs 2/5 Client API endpoints with unwrap() in query retrieval that could panic on invalid query IDs
src/common/partitioning/src/lib.rs 2/5 Redundant Partition trait implementation for PartitionRef that may indicate design issues
src/daft-dashboard/src/python.rs 3/5 Refactored Python bindings with atomic boolean tracking but potential synchronization issues
src/daft-dashboard/src/lib.rs 4/5 Major dashboard architecture refactoring with modular design and proper state management
src/daft-context/src/lib.rs 4/5 Removes Default trait from Config struct, forcing explicit environment-aware initialization
src/daft-context/src/subscribers/mod.rs 4/5 Integration of dashboard subscriber into default subscriber system with proper error handling
daft/subscribers/dashboard.py 4/5 Significant refactoring removing manual HTTP logic in favor of native Rust implementation
daft/dataframe/dataframe.py 4/5 Removes deprecated query plan broadcasting in favor of subscriber-based approach
daft/subscribers/init.py 4/5 API cleanup removing internal functions from public interface
src/daft-local-execution/src/runtime_stats/subscribers.rs 4/5 Clean removal of dashboard module declaration as part of architecture migration
src/daft-local-execution/src/runtime_stats/mod.rs 4/5 Refactoring to use unified Subscriber framework instead of direct RuntimeStatsSubscriber
src/daft-local-execution/src/runtime_stats/subscribers/dashboard.rs 4/5 Complete removal of old DashboardSubscriber implementation (183 lines)
src/daft-context/Cargo.toml 5/5 Added required dependencies for dashboard HTTP communication and async runtime support
src/daft-dashboard/Cargo.toml 5/5 Added metrics, concurrent data structures, and JSON serialization dependencies
Cargo.toml 5/5 Added workspace-level reqwest dependency with appropriate feature configuration
src/daft-cli/src/python.rs 5/5 Improved user experience by moving dashboard URL message outside spawned task
src/daft-schema/src/dtype.rs 5/5 Minor documentation improvement adding comment to is_arrow() method
src/daft-local-execution/src/runtime_stats/subscribers/opentelemetry.rs 4/5 Fixed node ID recording in OpenTelemetry metrics
src/daft-distributed/Cargo.toml 4/5 Standardized reqwest dependency to use workspace configuration
src/daft-cli/Cargo.toml 4/5 Removed redundant tracing dependency that was re-exported by tracing-subscriber
src/daft-local-execution/Cargo.toml 4/5 Removed reqwest dependency as HTTP functionality moved to daft-context
daft/daft/dashboard.pyi 4/5 Simplified Python interface removing URL getter functions
src/daft-dashboard/src/state.rs 4/5 New comprehensive state management system for tracking query execution lifecycle

Confidence score: 2/5

  • This PR requires careful review due to extensive use of unwrap() calls that could cause production panics
  • Score lowered significantly due to multiple files containing unwrap() without proper error handling in critical HTTP and state management code
  • Pay close attention to src/daft-context/src/subscribers/dashboard.rs, src/daft-dashboard/src/engine.rs, and src/daft-dashboard/src/client.rs which contain multiple panic-prone unwrap() calls

Sequence Diagram

sequenceDiagram
    participant User
    participant Engine
    participant DashboardSubscriber
    participant DashboardServer
    participant Dashboard UI

    User->>Engine: "Execute DataFrame query"
    Engine->>DashboardSubscriber: "on_query_start(query_id, unoptimized_plan)"
    DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/start"
    
    Engine->>DashboardSubscriber: "on_optimization_start(query_id)"
    DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/plan_start"
    
    Engine->>Engine: "Optimize query plan"
    
    Engine->>DashboardSubscriber: "on_optimization_end(query_id, optimized_plan)"
    DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/plan_end"
    
    Engine->>DashboardSubscriber: "on_exec_start(query_id, node_infos)"
    DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/exec/start"
    
    loop For each operator
        Engine->>DashboardSubscriber: "on_exec_operator_start(query_id, node_id)"
        DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/exec/{op_id}/start"
        
        Engine->>Engine: "Execute operator"
        
        Engine->>DashboardSubscriber: "on_exec_emit_stats(query_id, stats)"
        DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/exec/emit_stats"
        
        Engine->>DashboardSubscriber: "on_exec_operator_end(query_id, node_id)"
        DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/exec/{op_id}/end"
    end
    
    Engine->>DashboardSubscriber: "on_exec_end(query_id)"
    DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/exec/end"
    
    loop For each result partition
        Engine->>DashboardSubscriber: "on_result_out(query_id, result)"
        Note over DashboardSubscriber: "Collect up to 10 rows for preview"
    end
    
    Engine->>DashboardSubscriber: "on_query_end(query_id)"
    DashboardSubscriber->>DashboardServer: "POST /engine/query/{query_id}/end"
    
    User->>Dashboard UI: "View dashboard in browser"
    Dashboard UI->>DashboardServer: "GET /client/queries"
    DashboardServer->>Dashboard UI: "Return query summaries"
    
    Dashboard UI->>DashboardServer: "GET /client/query/{query_id}"
    DashboardServer->>Dashboard UI: "Return detailed query info"
Loading

25 files reviewed, 6 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +95 to +96
QueryState::Finished { end_sec, .. } => {
(QueryStatus::Finished, end_sec - self.start_sec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: This calculation returns duration in seconds for finished queries, but the field name suggests it should be the end timestamp. Consider renaming the return tuple field or adjusting the calculation.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/daft-dashboard/src/state.rs
Line: 95:96

Comment:
**logic:** This calculation returns duration in seconds for finished queries, but the field name suggests it should be the end timestamp. Consider renaming the return tuple field or adjusting the calculation.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment to clarify

Comment on lines 44 to 45
.build()
.unwrap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Client creation will panic on failure instead of returning a proper error

Suggested change
.build()
.unwrap()
.build()
.map_err(|e| DaftError::External(Box::new(e)))?
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/daft-context/src/subscribers/dashboard.rs
Line: 44:45

Comment:
**logic:** Client creation will panic on failure instead of returning a proper error

```suggestion
                .build()
                .map_err(|e| DaftError::External(Box::new(e)))?
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines 51 to 52
.build()
.unwrap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Client creation will panic on failure instead of returning a proper error

Suggested change
.build()
.unwrap()
.build()
.map_err(|e| DaftError::External(Box::new(e)))?
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/daft-context/src/subscribers/dashboard.rs
Line: 51:52

Comment:
**logic:** Client creation will panic on failure instead of returning a proper error

```suggestion
                .build()
                .map_err(|e| DaftError::External(Box::new(e)))?
```

How can I resolve this? If you propose a fix, please make it concise.

})
});

DASHBOARD_ENABLED.store(true, Ordering::SeqCst);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Setting DASHBOARD_ENABLED to true after launching the server could cause race conditions. If the server fails to start, the flag remains true, causing subsequent launch attempts to fail incorrectly.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/daft-dashboard/src/python.rs
Line: 99:99

Comment:
**logic:** Setting DASHBOARD_ENABLED to true after launching the server could cause race conditions. If the server fails to start, the flag remains true, causing subsequent launch attempts to fail incorrectly.

How can I resolve this? If you propose a fix, please make it concise.

State(state): State<Arc<DashboardState>>,
Path(query_id): Path<String>,
) -> Json<QueryInfo> {
let query = state.queries.get(&query_id).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Using unwrap() here will panic the server if query_id doesn't exist. Should return proper HTTP error response instead.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/daft-dashboard/src/client.rs
Line: 34:34

Comment:
**logic:** Using unwrap() here will panic the server if query_id doesn't exist. Should return proper HTTP error response instead.

How can I resolve this? If you propose a fix, please make it concise.

Copy link

codecov bot commented Oct 2, 2025

Codecov Report

❌ Patch coverage is 23.02905% with 371 lines in your changes missing coverage. Please review.
✅ Project coverage is 75.28%. Comparing base (901c35d) to head (27e628e).
⚠️ Report is 11 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-context/src/subscribers/dashboard.rs 2.39% 163 Missing ⚠️
src/daft-dashboard/src/engine.rs 7.79% 142 Missing ⚠️
src/daft-dashboard/src/client.rs 17.85% 23 Missing ⚠️
src/daft-dashboard/src/state.rs 57.69% 11 Missing ⚠️
src/daft-dashboard/src/lib.rs 23.07% 10 Missing ⚠️
src/daft-context/src/subscribers/debug.rs 0.00% 9 Missing ⚠️
src/daft-context/src/subscribers/mod.rs 28.57% 5 Missing ⚠️
src/daft-context/src/subscribers/python.rs 88.57% 4 Missing ⚠️
src/daft-cli/src/python.rs 0.00% 3 Missing ⚠️
src/daft-dashboard/src/python.rs 92.30% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5266      +/-   ##
==========================================
- Coverage   75.42%   75.28%   -0.15%     
==========================================
  Files         983      986       +3     
  Lines      123732   123921     +189     
==========================================
- Hits        93327    93294      -33     
- Misses      30405    30627     +222     
Files with missing lines Coverage Δ
daft/dataframe/dataframe.py 78.24% <ø> (-0.14%) ⬇️
daft/subscribers/__init__.py 100.00% <100.00%> (ø)
daft/subscribers/dashboard.py 100.00% <ø> (+18.18%) ⬆️
src/common/metrics/src/lib.rs 37.83% <ø> (ø)
src/daft-context/src/lib.rs 95.23% <100.00%> (+0.32%) ⬆️
src/daft-context/src/python.rs 83.51% <100.00%> (+1.16%) ⬆️
src/daft-local-execution/src/runtime_stats/mod.rs 91.29% <ø> (+0.20%) ⬆️
...ion/src/runtime_stats/subscribers/opentelemetry.rs 0.00% <ø> (ø)
...l-execution/src/runtime_stats/subscribers/query.rs 87.50% <100.00%> (ø)
src/daft-schema/src/dtype.rs 86.21% <ø> (ø)
... and 10 more

... and 19 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

pub(crate) struct ExecInfo {
pub exec_start_sec: u64,
// TODO: Replace usize with NodeID
pub operators: HashMap<usize, OperatorInfo>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO!!!


/// An Arc'd reference to a [`Partition`]
pub type PartitionRef = Arc<dyn Partition>;
impl Partition for PartitionRef {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as last time i think u dont need this

pub use crate::subscribers::Subscriber;

#[derive(Debug, Default)]
#[derive(Debug)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more default config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️ i'll blame merge conflicts. added back


use crate::subscribers::Subscriber;

fn now() -> u64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this more descriptive like what does now mean? like seconds or like milliseconds

url: String,
client: Client,
runtime: RuntimeRef,
head_rows: DashMap<String, MicroPartitionRef>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

head_rows sounds weird how about previews


impl DashboardSubscriber {
pub fn try_new() -> DaftResult<Option<Self>> {
let Ok(url) = std::env::var("DAFT_DASHBOARD_URL") else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who sets this us or user?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

tower-http = {version = "0.6", features = ["fs", "trace", "cors"]}
include_dir = "0.7.4"
parking_lot = {workspace = true}
dashmap = "6.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workspace

*node_id,
stats
.into_iter()
.map(|(name, stat)| ((*name).to_string(), stat.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets try not to do .to_string() or any heap allocations on emit stats if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what exactly was our decision with this? I could look into tools inside of serde for specializing lifetimes between serialization and deserialization, but that will take some time to figure out. Should we just have the Send / Recv split for the time being?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thats fine

Path(query_id): Path<String>,
Json(args): Json<PlanStartArgs>,
) -> StatusCode {
state.queries.get_mut(&query_id).unwrap().value_mut().status = QueryState::Planning {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we return errors here and for the rest of the handlers if the query id does not exist

Comment on lines 51 to 73
pub(crate) enum QueryState {
Pending,
Planning {
plan_start_sec: u64,
},
Planned(PlanInfo),
Executing {
plan_info: PlanInfo,
exec_info: ExecInfo,
},
Finalizing {
plan_info: PlanInfo,
exec_info: ExecInfo,
exec_end_sec: u64,
},
Finished {
plan_info: PlanInfo,
exec_info: ExecInfo,
exec_end_sec: u64,
end_sec: u64,
results: RecordBatch,
},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be consistent with names match whatever we call it in the subscriber

Copy link
Contributor Author

@srilman srilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed comments

pub use crate::subscribers::Subscriber;

#[derive(Debug, Default)]
#[derive(Debug)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️ i'll blame merge conflicts. added back

}
}

const TOTAL_ROWS: usize = 10;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I want to add the environment variable for it right now, but will probably add some config in the future for all observability. Is that OK?

*node_id,
stats
.into_iter()
.map(|(name, stat)| ((*name).to_string(), stat.clone()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what exactly was our decision with this? I could look into tools inside of serde for specializing lifetimes between serialization and deserialization, but that will take some time to figure out. Should we just have the Send / Recv split for the time being?

Comment on lines 249 to 253
.send()
.await
.map_err(|e| DaftError::ConnectTimeout(Box::new(e)))?
.error_for_status()
.map_err(|e| DaftError::External(Box::new(e)))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a helper

@srilman srilman requested a review from colin-ho October 3, 2025 06:03
Copy link
Contributor

@colin-ho colin-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just have a bunch of nits regarding the copying of stats etc. doesn't affect functionality tho

feel free to merge this in we can address later

}
}

const TOTAL_ROWS: usize = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah ok

&self,
query_id: String,
query_id: QueryID,
stats: &[(NodeID, StatSnapshotView)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need view anymore? I feel like we can make do with just send + recv

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cuz right now i believe we make a copy of the stats when we go from StatSnapshotSend -> StatSnapshotView in RuntimeStatsSubscriber::handle_event, then another copy in on_exec_emit_stats to go from StatSnapshotView ExecEmitStatsArgsSend

"{}/engine/query/{}/exec/emit_stats",
self.url, query_id
))
.json(&daft_dashboard::engine::ExecEmitStatsArgsSend {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also i feel like we don't even need ExecEmitStatsArgsSend, we can pass &[(NodeID, StatSnapshotSend)] directly into .json

if im not wrong, &[(NodeID, StatSnapshotSend)] is serializable. .json is gonna call serde_json::to_vec(json) regardless of what json is anyway, so i think we can avoid alloc by passing in the slice.

Also maybe we don't need execemitstatsargrecv either, and can just store statsnapshotrecv in the dashboard state operator info. (might require changing the frontend interfaces because its a smallvec not hashmap)

@colin-ho
Copy link
Contributor

colin-ho commented Oct 3, 2025

I just have a bunch of nits regarding the copying of stats etc. doesn't affect functionality tho

feel free to merge this in we can address later

#5329

@srilman srilman merged commit 1f46a56 into main Oct 3, 2025
44 of 45 checks passed
@srilman srilman deleted the slade/dashboard-subscriber branch October 3, 2025 18:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants