Skip to content

Add tracing support #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions optd-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ optd = { path = "../optd" }
clap = { version = "4.5.38", features = ["derive"] }
colored = "3.0.0"
tokio = "1.45.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "ansi"] }
84 changes: 65 additions & 19 deletions optd-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Builder;
use tokio::task::JoinSet;
use tracing::{Instrument, instrument};
use tracing_subscriber::{EnvFilter, fmt};

#[derive(Parser)]
#[command(
Expand All @@ -66,35 +68,53 @@ enum Commands {
}

fn main() -> Result<(), Vec<CompileError>> {
// Initialize tracing subscriber
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("optd=info,optd::cli=info"));
fmt()
.with_env_filter(filter)
.pretty()
.with_ansi(true)
.init();

tracing::info!(target: "optd::cli", "optd-cli starting up");
let cli = Cli::parse();

let mut udfs = HashMap::new();
let udf = Udf {
let unimplemented_udf = Udf {
func: Arc::new(|_, _, _| {
Box::pin(async move {
println!("This user-defined function is unimplemented!");
Value::new(CoreData::<Value>::None)
})
}),
};
udfs.insert("unimplemented_udf".to_string(), udf.clone());
udfs.insert("unimplemented_udf".to_string(), unimplemented_udf.clone());

match cli.command {
Commands::Compile(config) => {
let compile_span =
tracing::info_span!(target: "optd::cli", "compile_file", path = %config.path_str());
let _guard = compile_span.enter();
tracing::info!("Starting compilation");
for mock_udf in config.mock_udfs() {
udfs.insert(mock_udf.to_string(), udf.clone());
udfs.insert(mock_udf.to_string(), unimplemented_udf.clone());
}

let _ = compile_hir(config, udfs).unwrap_or_else(|errors| handle_errors(&errors));
tracing::info!("Compilation completed successfully");
Ok(())
}
Commands::RunFunctions(config) => {
// TODO(Connor): Add support for running functions with real UDFs.
let run_span = tracing::info_span!(target: "optd::cli", "run_dsl_functions", path = %config.path_str());
let _guard = run_span.enter();
tracing::info!("Starting function execution");
for mock_udf in config.mock_udfs() {
udfs.insert(mock_udf.to_string(), udf.clone());
udfs.insert(mock_udf.to_string(), unimplemented_udf.clone());
}

let hir = compile_hir(config, udfs).unwrap_or_else(|errors| handle_errors(&errors));
tracing::info!("Compilation successful, proceeding to run functions");

run_all_functions(&hir)
}
Expand All @@ -108,17 +128,25 @@ struct FunctionResult {
}

/// Run all functions found in the HIR, marked with [run].
#[instrument(
level = "info",
skip(hir),
target = "optd::cli",
name = "run_all_functions"
)]
fn run_all_functions(hir: &HIR) -> Result<(), Vec<CompileError>> {
println!("\n{} {}\n", "•".green(), "Running functions...".green());

let functions = find_functions(hir);
let functions_to_run = find_functions(hir);
tracing::info!(target: "optd::cli", num_functions = functions_to_run.len(), "Found functions to run");

if functions.is_empty() {
if functions_to_run.is_empty() {
println!("No functions found annotated with [run]");
tracing::warn!(target: "optd::cli", "No functions found annotated with [run]");
return Ok(());
}

println!("Found {} functions to run", functions.len());
println!("Found {} functions to run", functions_to_run.len());

// Create a multi-threaded runtime for parallel execution.
// TODO: We increase the stack size by x64 to avoid stack overflow
Expand All @@ -128,7 +156,8 @@ fn run_all_functions(hir: &HIR) -> Result<(), Vec<CompileError>> {
.enable_all()
.build()
.unwrap();
let function_results = runtime.block_on(run_functions_in_parallel(hir, functions));
tracing::debug!(target: "optd::cli", "Tokio runtime initialized for function execution");
let function_results = runtime.block_on(run_functions_in_parallel(hir, functions_to_run));

// Process and display function results.
let success_count = process_function_results(function_results);
Expand All @@ -138,6 +167,7 @@ fn run_all_functions(hir: &HIR) -> Result<(), Vec<CompileError>> {
"Execution Results:".yellow(),
format!("{} functions executed", success_count).yellow()
);
tracing::info!(target: "optd::cli", success_count, "Function execution finished");

Ok(())
}
Expand All @@ -147,25 +177,36 @@ async fn run_functions_in_parallel(hir: &HIR, functions: Vec<String>) -> Vec<Fun
let retriever = Arc::new(MockRetriever::new());
let mut set = JoinSet::new();

tracing::debug!(
target: "optd::cli",
num_functions = functions.len(),
"Spawning functions for parallel execution"
);
for function_name in functions {
let engine = Engine::new(hir.context.clone(), catalog.clone(), retriever.clone());
let name = function_name.clone();

set.spawn(async move {
// Create a continuation that returns itself.
let result_handler: Continuation<Value, Value> =
Arc::new(|value| Box::pin(async move { value }));

// Launch the function with an empty vector of arguments.
let result = engine.launch(&name, vec![], result_handler).await;
FunctionResult { name, result }
});
set.spawn(
async move {
// Create a continuation that returns itself.
let result_handler: Continuation<Value, Value> =
Arc::new(|value| Box::pin(async move { value }));

tracing::debug!(target: "optd::cli", function_name = %name, "Launching function");
// Launch the function with an empty vector of arguments.
let result = engine.launch(&name, vec![], result_handler).await;
tracing::debug!(target: "optd::cli", function_name = %name, "Function launch completed");
FunctionResult { name, result }
}
.instrument(tracing::info_span!(target: "optd::cli", "run_dsl_function", function_name = %function_name)),
);
}

// Collect all function results.
let mut results = Vec::new();
while let Some(result) = set.join_next().await {
if let Ok(function_result) = result {
tracing::debug!(target: "optd::cli", function_name = %function_result.name, "Function task completed");
results.push(function_result);
}
}
Expand All @@ -174,20 +215,24 @@ async fn run_functions_in_parallel(hir: &HIR, functions: Vec<String>) -> Vec<Fun
}

/// Process function results and display them.
#[instrument(level = "info", skip(function_results), target = "optd::cli")]
fn process_function_results(function_results: Vec<FunctionResult>) -> usize {
let mut success_count = 0;

for function_result in function_results {
println!("\n{} {}", "Function:".blue(), function_result.name);
println!("\n{} {}", "Function:".blue(), &function_result.name);

match function_result.result {
EngineResponse::Return(value, _) => {
tracing::trace!(target: "optd::cli", function_name = %function_result.name, "Function returned a value");
// Check if the result is a failure.
if matches!(value.data, CoreData::Fail(_)) {
tracing::warn!(target: "optd::cli", function_name = %function_result.name, "Function failed: {}", value);
println!(" {}: Function failed: {}", "Error".red(), value);
} else {
println!(" {}: {}", "Result".green(), value);
success_count += 1;
tracing::debug!(target: "optd::cli", function_name = %function_result.name, "Function succeeded. Result: {}", value);
}
}
_ => unreachable!(), // For now, unless we add a special UDF that builds a group / goal.
Expand All @@ -214,6 +259,7 @@ fn find_functions(hir: &HIR) -> Vec<String> {

/// Display error details and exit the program.
fn handle_errors(errors: &[CompileError]) -> ! {
tracing::error!(target: "optd::cli", num_errors = errors.len(), "Compilation failed");
eprintln!(
"\n{} {}\n",
"•".yellow(),
Expand Down
2 changes: 2 additions & 0 deletions optd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ once_cell = "1.21.3"
ordered-float = "5.0.0"
tempfile = "3.20.0"
tokio = { version = "1.45.0", features = ["macros", "rt"] }
tracing = "0.1"
trait-variant = "0.1.2"

[dev-dependencies]
Expand All @@ -28,3 +29,4 @@ tokio = { version = "1.45.0", features = [
"rt-multi-thread",
"test-util"
] }
tracing-test = "0.2"
6 changes: 6 additions & 0 deletions optd/src/catalog/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ impl<C: IcebergCatalog> OptdIcebergCatalog<C> {
}

/// Retrieves a [`Table`] from the catalog.
#[tracing::instrument(
level = "debug",
skip_all,
fields(table_name = %table_name),
target = "optd::catalog"
)]
async fn get_table(&self, table_name: &str) -> Result<Table, CatalogError> {
let namespace_ident = NamespaceIdent::new(DEFAULT_NAMESPACE.to_string());
let table_ident = TableIdent::new(namespace_ident, table_name.to_string());
Expand Down
4 changes: 3 additions & 1 deletion optd/src/demo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::{
sync::mpsc,
time::{sleep, timeout},
};
use tracing::instrument;

pub async fn properties(
args: Vec<Value>,
Expand All @@ -31,7 +32,7 @@ pub async fn properties(

retriever.get_properties(group_id).await
}

#[instrument(target = "optd::demo", level = "info", name = "run_demo")]
async fn run_demo() {
// Compile the HIR.
let config = Config::new("src/demo/demo.opt".into());
Expand Down Expand Up @@ -100,6 +101,7 @@ async fn run_demo() {
mod demo {
use super::*;

#[tracing_test::traced_test]
#[tokio::test]
async fn test_optimizer_demo() {
run_demo().await
Expand Down
Loading