diff --git a/.clippy.toml b/.clippy.toml index c8c25d3..69f298f 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1,4 +1,4 @@ -excessive-nesting-threshold = 4 +excessive-nesting-threshold = 7 too-many-arguments-threshold = 10 allowed-idents-below-min-chars = ["..", "k", "f", "re", "id", "Ok", "'_"] allowed-duplicate-crates = [ diff --git a/.devcontainer/gpu/Dockerfile b/.devcontainer/gpu/Dockerfile index f375d57..05a5e2f 100644 --- a/.devcontainer/gpu/Dockerfile +++ b/.devcontainer/gpu/Dockerfile @@ -23,7 +23,7 @@ RUN \ apt-get clean ENV PATH="/root/.cargo/bin:${PATH}" -RUN apt-get update && curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | bash -s -- -y && \ +RUN apt-get update && curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | bash -s -- -y --default-toolchain 1.86.0 && \ # rust auto formatting rustup component add rustfmt && \ # rust style linter diff --git a/Cargo.toml b/Cargo.toml index 95d0653..d017e3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ glob = "0.3.1" heck = "0.5.0" # random name generator names = "0.14.0" +petgraph = { version = "0.8.1", features = ["serde-1"] } # complex pattern matching in strings regex = "1.11.0" # serialization/deserialization to/from filestore @@ -96,6 +97,7 @@ as_conversions = { level = "allow", priority = 127 } # allow c assertions_on_result_states = { level = "allow", priority = 127 } # allow checking is_ok/is_err big_endian_bytes = { level = "allow", priority = 127 } # allow to_be_bytes / from_be_bytes blanket_clippy_restriction_lints = { level = "allow", priority = 127 } # allow setting all restrictions so we can omit specific ones +clone_on_ref_ptr = { level = "allow", priority = 127 } # allow clone on ref ptr default_numeric_fallback = { level = "allow", priority = 127 } # allow type inferred by numeric literal disallowed_script_idents = { level = "allow", priority = 127 } # skip since we use only ascii else_if_without_else = { level = "allow", priority = 127 } # missing else ok @@ -107,13 +109,16 @@ impl_trait_in_params = { level = "allow", priority = 127 } # impl in implicit_return = { level = "allow", priority = 127 } # missing return ok inline_asm_x86_intel_syntax = { level = "allow", priority = 127 } # intel syntax ok integer_division = { level = "allow", priority = 127 } # allow discarding remainder +iter-over-hash-type = { level = "allow", priority = 127 } # allow iterating over hash types little_endian_bytes = { level = "allow", priority = 127 } # allow to_le_bytes / from_le_bytes +min_ident_chars = { level = "allow", priority = 127 } # allow short identifiers missing_asserts_for_indexing = { level = "allow", priority = 127 } # missing assert before indexing ok missing_docs_in_private_items = { level = "allow", priority = 127 } # missing docs on private ok missing_inline_in_public_items = { level = "allow", priority = 127 } # let rust compiler determine best inline logic missing_trait_methods = { level = "allow", priority = 127 } # allow in favor of rustc `implement the missing item` module_name_repetitions = { level = "allow", priority = 127 } # allow use of module name in type names multiple_inherent_impl = { level = "allow", priority = 127 } # required in best practice to limit exposure over UniFFI +multiple_crate_versions = { level = "allow", priority = 127 } # allow multiple versions of the same crate must_use_candidate = { level = "allow", priority = 127 } # omitting #[must_use] ok mod_module_files = { level = "allow", priority = 127 } # mod directories ok non_ascii_literal = { level = "allow", priority = 127 } # non-ascii char in string literal ok @@ -125,9 +130,11 @@ pub_use = { level = "allow", priority = 127 } # ok to s pub_with_shorthand = { level = "allow", priority = 127 } # allow use of pub(super) pub_without_shorthand = { level = "allow", priority = 127 } # allow use of pub(in super) question_mark_used = { level = "allow", priority = 127 } # allow question operator +result_large_err = { level = "allow", priority = 127 } # allow large error types, performance hit is acceptable self_named_module_files = { level = "allow", priority = 127 } # mod files ok semicolon_inside_block = { level = "allow", priority = 127 } # ok to keep inside block separated_literal_suffix = { level = "allow", priority = 127 } # literal suffixes should be separated by underscore +shadow_reuse = { level = "allow", priority = 127 } # Allow shadow reuses in the cases of impl into single_char_lifetime_names = { level = "allow", priority = 127 } # single char lifetimes ok single_component_path_imports = { level = "allow", priority = 127 } # allow for readability std_instead_of_alloc = { level = "allow", priority = 127 } # we should use std when possible diff --git a/src/core/crypto.rs b/src/core/crypto.rs index b18e343..58b5832 100644 --- a/src/core/crypto.rs +++ b/src/core/crypto.rs @@ -49,7 +49,7 @@ pub fn hash_buffer(buffer: impl AsRef<[u8]>) -> String { /// Will return error if unable to access file. pub fn hash_file(filepath: impl AsRef) -> Result { hash_stream( - &mut File::open(&filepath).context(selector::InvalidFilepath { + &mut File::open(&filepath).context(selector::InvalidFileOrDirPath { path: filepath.as_ref(), })?, ) @@ -62,7 +62,10 @@ pub fn hash_file(filepath: impl AsRef) -> Result { pub fn hash_dir(dirpath: impl AsRef) -> Result { let summary: BTreeMap = dirpath .as_ref() - .read_dir()? + .read_dir() + .context(selector::InvalidFileOrDirPath { + path: dirpath.as_ref(), + })? .map(|path| { let access_path = path?.path(); Ok(( diff --git a/src/core/error.rs b/src/core/error.rs index 0dd158e..7882a4c 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -9,6 +9,7 @@ use std::{ io, path::{self}, }; +use tokio::task::JoinError; impl From for OrcaError { fn from(error: BollardError) -> Self { @@ -70,6 +71,17 @@ impl From for OrcaError { } } } +impl From for OrcaError { + fn from(error: JoinError) -> Self { + Self { + kind: Kind::IoError { + source: error.into(), + backtrace: Some(Backtrace::capture()), + }, + } + } +} + impl From for OrcaError { fn from(error: Kind) -> Self { Self { kind: error } @@ -90,21 +102,26 @@ impl fmt::Debug for OrcaError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match &self.kind { Kind::EmptyResponseWhenLoadingContainerAltImage { backtrace, .. } + | Kind::FailedToExtractRunInfo { backtrace, .. } + | Kind::FailedToStartPod { backtrace, .. } | Kind::GeneratedNamesOverflow { backtrace, .. } - | Kind::InvalidFilepath { backtrace, .. } + | Kind::InvalidFileOrDirPath { backtrace, .. } | Kind::InvalidPodResultTerminatedDatetime { backtrace, .. } | Kind::KeyMissing { backtrace, .. } | Kind::NoAnnotationFound { backtrace, .. } | Kind::NoContainerNames { backtrace, .. } + | Kind::NodeNotFound { backtrace, .. } | Kind::NoFileName { backtrace, .. } | Kind::NoMatchingPodRun { backtrace, .. } | Kind::NoTagFoundInContainerAltImage { backtrace, .. } + | Kind::MissingStreamKey { backtrace, .. } | Kind::BollardError { backtrace, .. } | Kind::GlobPatternError { backtrace, .. } | Kind::IoError { backtrace, .. } | Kind::PathPrefixError { backtrace, .. } | Kind::SerdeJsonError { backtrace, .. } - | Kind::SerdeYamlError { backtrace, .. } => { + | Kind::SerdeYamlError { backtrace, .. } + | Kind::TokioJoinError { backtrace, .. } => { write!(f, "{}{}", self.kind, format_stack(backtrace.as_ref())) } } diff --git a/src/core/mod.rs b/src/core/mod.rs index 3d58aee..138a494 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -4,5 +4,9 @@ pub(crate) mod error; /// Components of the data model. pub mod model; pub(crate) mod orchestrator; +/// Components relating to pipelines +pub mod pipeline; +/// Components relating to pipeline runner +pub mod pipeline_runner; pub(crate) mod store; pub(crate) mod util; diff --git a/src/core/orchestrator/docker.rs b/src/core/orchestrator/docker.rs index 64e533b..eb873a8 100644 --- a/src/core/orchestrator/docker.rs +++ b/src/core/orchestrator/docker.rs @@ -7,8 +7,9 @@ use crate::{ }, }; use bollard::{ - container::{Config, CreateContainerOptions, ListContainersOptions}, + container::{Config, CreateContainerOptions, ListContainersOptions, RemoveContainerOptions}, models::{ContainerStateStatusEnum, HostConfig}, + secret::{ContainerInspectResponse, ContainerSummary}, }; use chrono::DateTime; use futures_util::future::join_all; @@ -34,6 +35,11 @@ pub static RE_IMAGE_TAG: LazyLock = LazyLock::new(|| { .expect("Invalid image tag regex.") }); +#[expect(clippy::expect_used, reason = "Valid static regex")] +static RE_FOR_CMD: LazyLock = LazyLock::new(|| { + Regex::new(r#"[^\s"']+|"[^"]*"|'[^']*'"#).expect("Invalid model metadata regex.") +}); + impl LocalDockerOrchestrator { fn prepare_mount_binds( namespace_lookup: &HashMap, @@ -57,7 +63,7 @@ impl LocalDockerOrchestrator { .try_fold::<_, _, Result<_>>( vec![], |mut flattened_binds, (stream_name, stream_info)| { - flattened_binds.extend(match get(&pod_job.input_stream, stream_name)? { + flattened_binds.extend(match get(&pod_job.input_map, stream_name)? { Input::Unary(blob) => { vec![format!( "{}:{}:{}", @@ -108,7 +114,11 @@ impl LocalDockerOrchestrator { - Pod commands will always have at least 1 element "# )] - pub(crate) fn prepare_container_start_inputs( + /// Prepare the inputs for starting a container. + /// + /// # Errors + /// Will fail if pod job is invalid + pub fn prepare_container_start_inputs( namespace_lookup: &HashMap, pod_job: &PodJob, image: String, @@ -143,11 +153,15 @@ impl LocalDockerOrchestrator { serde_json::to_string(&pod_job)?, ), ]); - let command = pod_job - .pod - .command - .split_whitespace() - .map(String::from) + let command = RE_FOR_CMD + .captures_iter(&pod_job.pod.command) + .map(|capture| { + capture + .extract::<0>() + .0 + .to_owned() + .replace(['\'', '\"'], "") + }) .collect::>(); Ok(( @@ -178,10 +192,7 @@ impl LocalDockerOrchestrator { )) } #[expect( - clippy::cast_sign_loss, clippy::string_slice, - clippy::cast_precision_loss, - clippy::cast_possible_truncation, clippy::indexing_slicing, reason = r#" - Timestamp and memory should always have a value > 0 @@ -194,7 +205,7 @@ impl LocalDockerOrchestrator { pub(crate) async fn list_containers( &self, filters: HashMap>, // https://docs.rs/bollard/latest/bollard/container/struct.ListContainersOptions.html#structfield.filters - ) -> Result> { + ) -> Result>> { Ok(join_all( self.api .list_containers(Some(ListContainersOptions { @@ -218,70 +229,151 @@ impl LocalDockerOrchestrator { ) .await .into_iter() - .filter_map(|result: Result<_>| { - let (container_name, container_summary, container_spec) = result.ok()?; - let terminated_timestamp = - DateTime::parse_from_rfc3339(container_spec.state.as_ref()?.finished_at.as_ref()?) - .ok()? - .timestamp() as u64; - Some(( + .map(|result: Result<_>| { + let (container_name, container_summary, container_inspect_response) = match result { + Ok((container_name, container_summary, container_inspect_response)) => ( + container_name, + container_summary, + container_inspect_response, + ), + Err(error) => { + return Err(error); + } + }; + + Ok( + Self::extract_run_info(&container_summary, &container_inspect_response) + .map(|run_info| (container_name.clone(), run_info)) + .context(selector::FailedToExtractRunInfo { container_name })?, + ) + })) + } + pub(crate) async fn delete_container(&self, container_name: &str) -> Result<()> { + self.api + .remove_container( container_name, - RunInfo { - image: container_spec.config.as_ref()?.image.as_ref()?.clone(), - created: container_summary.created? as u64, - terminated: (terminated_timestamp > 0).then_some(terminated_timestamp), - env_vars: container_spec - .config - .as_ref()? - .env + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + ) + .await?; + Ok(()) + } + + #[expect( + clippy::cast_sign_loss, + clippy::cast_precision_loss, + clippy::cast_possible_truncation, + reason = r#" + - Timestamp and memory should always have a value > 0 + - Container will always have a name with more than 1 character + - No issue in core casting if between 0 - 3.40e38(f32:MAX) + - No issue in exit code casting if between -3.27e4(i16:MIN) - 3.27e4(i16:MAX) + - Containers will always have at least 1 name with at least 2 characters + - This functions requires a lot of boilerplate code to extract the run info + "# + )] + fn extract_run_info( + container_summary: &ContainerSummary, + container_inspect_response: &ContainerInspectResponse, + ) -> Option { + let terminated_timestamp = DateTime::parse_from_rfc3339( + container_inspect_response + .state + .as_ref()? + .finished_at + .as_ref()?, + ) + .ok()? + .timestamp() as u64; + Some(RunInfo { + image: container_inspect_response + .config + .as_ref()? + .image + .as_ref()? + .clone(), + created: container_summary.created? as u64, + terminated: (terminated_timestamp > 0).then_some(terminated_timestamp), + env_vars: container_inspect_response + .config + .as_ref()? + .env + .as_ref()? + .iter() + .filter_map(|x| { + x.split_once('=') + .map(|(key, value)| (key.to_owned(), value.to_owned())) + }) + .collect(), + command: format!( + "{} {}", + container_inspect_response + .config + .as_ref()? + .entrypoint + .as_ref()? + .join(" "), + container_inspect_response + .config + .as_ref()? + .cmd + .as_ref()? + .join(" ") + ), + status: match ( + container_inspect_response.state.as_ref()?.status?, + container_inspect_response.state.as_ref()?.exit_code? as i16, + ) { + (ContainerStateStatusEnum::RUNNING, _) => Status::Running, + (ContainerStateStatusEnum::EXITED, 0) => Status::Completed, + (ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::DEAD, code) => { + Status::Failed(code) + } + ( + ContainerStateStatusEnum::CREATED | ContainerStateStatusEnum::RESTARTING, + code, + ) => { + if container_inspect_response + .state .as_ref()? - .iter() - .filter_map(|x| { - x.split_once('=') - .map(|(key, value)| (key.to_owned(), value.to_owned())) - }) - .collect(), - command: format!( - "{} {}", - container_spec - .config - .as_ref()? - .entrypoint - .as_ref()? - .join(" "), - container_spec.config.as_ref()?.cmd.as_ref()?.join(" ") - ), - status: match ( - container_spec.state.as_ref()?.status.as_ref()?, - container_spec.state.as_ref()?.exit_code? as i16, - ) { - (ContainerStateStatusEnum::RUNNING, _) => Status::Running, - (ContainerStateStatusEnum::EXITED, 0) => Status::Completed, - (ContainerStateStatusEnum::EXITED, code) => Status::Failed(code), - _ => todo!(), - }, - mounts: container_spec - .mounts + .error .as_ref()? - .iter() - .map(|mount_point| { - Some(format!( - "{}:{}{}", - mount_point.source.as_ref()?, - mount_point.destination.as_ref()?, - mount_point - .mode - .as_ref() - .map_or_else(String::new, |mode| format!(":{mode}")) - )) - }) - .collect::>>()?, - labels: container_spec.config.as_ref()?.labels.as_ref()?.clone(), - cpu_limit: container_spec.host_config.as_ref()?.nano_cpus? as f32 - / 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9 - memory_limit: container_spec.host_config.as_ref()?.memory? as u64, - }, - )) - })) + .is_empty() + { + Status::Queued + } else { + Status::Failed(code) + } + } + _ => Status::Unknown, + }, + mounts: container_inspect_response + .mounts + .as_ref()? + .iter() + .map(|mount_point| { + Some(format!( + "{}:{}{}", + mount_point.source.as_ref()?, + mount_point.destination.as_ref()?, + mount_point + .mode + .as_ref() + .map_or_else(String::new, |mode| format!(":{mode}")) + )) + }) + .collect::>>()?, + labels: container_inspect_response + .config + .as_ref()? + .labels + .as_ref()? + .clone(), + cpu_limit: container_inspect_response.host_config.as_ref()?.nano_cpus? as f32 + / 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9 + memory_limit: container_inspect_response.host_config.as_ref()?.memory? as u64, + }) } } diff --git a/src/core/pipeline.rs b/src/core/pipeline.rs new file mode 100644 index 0000000..ead6ce4 --- /dev/null +++ b/src/core/pipeline.rs @@ -0,0 +1,359 @@ +use serde::Serialize; +use std::{ + backtrace::Backtrace, + collections::{HashMap, HashSet}, +}; + +use crate::uniffi::{ + error::{Kind, OrcaError, Result}, + model::{Annotation, Input, Mapper, Pod}, +}; +use petgraph::prelude::NodeIndex; +use petgraph::{ + Directed, + Direction::{Incoming, Outgoing}, + Graph, +}; + +use crate::core::model::serialize_hashmap; + +use super::util::get; + +#[derive(Serialize, PartialEq, Debug, Clone)] +/// Enum to store different types of nodes explicitly +pub enum Node { + /// Pod node + Pod(Box), + /// Mapper node + Mapper(Mapper), +} + +impl Node { + /// Get the hash of the node + pub fn get_hash(&self) -> String { + match self { + Self::Pod(pod) => pod.hash.clone(), + Self::Mapper(mapper) => mapper.hash.clone(), + } + } + + /// # Errors + /// Error out if fail to join all parents futures + pub fn process(&self, input_map: &HashMap) -> Result> { + match self { + Self::Pod(pod) => { + // Print out pod hash for now + println!("Processing pod: {}", pod.hash); + // TODO: Actual pod job creation and submission to orchestrator + Ok(input_map.clone()) + } + Self::Mapper(mapper) => { + // Print out mapper hash for now + println!("Processing mapper: {}", mapper.hash); + Ok(input_map.clone()) + } + } + } +} + +impl From for Node { + fn from(pod: Pod) -> Self { + Self::Pod(Box::new(pod)) + } +} +impl From for Node { + fn from(mapper: Mapper) -> Self { + Self::Mapper(mapper) + } +} + +/// Pipeline struct +#[derive(Serialize, Debug, Default, Clone)] +pub struct Pipeline { + hash: String, + #[serde(skip)] + annotation: Option, + /// String are hashes of the nodes without the _{`num_matches`} + pub nodes: HashMap, + /// Strings are unique hashes of the nodes with the _{`num_matches`} + pub graph: Graph, + output_nodes: HashSet, +} + +impl Pipeline { + /// Creates a new `Pipeline` instance. + pub const fn new( + annotation: Option, + nodes: HashMap, + graph: Graph, + output_nodes: HashSet, + ) -> Self { + Self { + hash: String::new(), // TODO: Need to implement to yaml then hash that + annotation, + nodes, + graph, + output_nodes, + } + } + + /// # Errors + /// Error out if the `node_key` is not found in the pipeline.nodes + #[expect(clippy::string_slice, reason = "Should never fail as we are in")] + pub fn get_node(&self, node_key: &str) -> Result<&Node> { + let char_to_cut_at = '_'; + + let key = node_key + .rfind(char_to_cut_at) + .map_or(node_key, |index| &node_key[..index]); + get(&self.nodes, key) + } + + /// Function to get the root nodes of the pipeline + pub fn get_root_nodes(&self) -> impl Iterator { + self.graph + .node_indices() + .filter(|&node_index| self.graph.neighbors_directed(node_index, Incoming).count() == 0) + .map(|node_index| &self.graph[node_index]) + } + + pub fn get_root_nodes_idx(&self) -> impl Iterator { + self.graph.node_indices().filter(|&node_index| { + (self.graph.neighbors_directed(node_index, Incoming).count() == 0) + }) + } + + /// Function to get the leaf nodes of the pipeline + /// Mainly used to get the output nodes when user does not specify them + pub fn get_leaf_nodes(&self) -> impl Iterator { + // Leaf nodes are those that are not keys in the edges map (i.e., not parents of any node) + self.graph + .node_indices() + .filter(|&node_index| { + self.graph + .neighbors_directed(node_index, Outgoing) + .next() + .is_none() + }) + .map(|node_index| &self.graph[node_index]) + } + + /// Function to get the parents of a node + pub fn get_parents_key_for_node(&self, node_key: &str) -> impl Iterator { + // Find the NodeIndex for the given node_key + let node_index = self + .graph + .node_indices() + .find(|&idx| self.graph[idx] == node_key); + node_index.into_iter().flat_map(move |idx| { + self.graph + .neighbors_directed(idx, Incoming) + .map(move |parent_idx| &self.graph[parent_idx]) + }) + } +} + +impl PartialEq for Pipeline { + fn eq(&self, other: &Self) -> bool { + self.hash == other.hash + && self.nodes == other.nodes + && self.output_nodes == other.output_nodes + } +} + +impl From for Pipeline { + fn from(val: PipelineBuilder) -> Self { + let mut pipeline = val.pipeline; + if pipeline.output_nodes.is_empty() { + // If there are no output nodes, then we need to set the output nodes to the leaf nodes + pipeline.output_nodes = pipeline.get_leaf_nodes().cloned().collect(); + } + + pipeline + } +} + +#[derive(Serialize, Debug, Clone)] +/// `PipelineJob` struct +/// This struct is used to store the pipeline and the input map +pub struct PipelineJob { + /// Hash of the pipeline job + pub hash: String, + /// Pipeline struct + pub pipeline: Pipeline, + #[serde(serialize_with = "serialize_hashmap")] + /// Mapping of outside input to keys to be match with the pipeline `input_map` + pub input_map: HashMap, + annotation: Option, +} + +impl PipelineJob { + /// New function for pipeline job + /// # Errors + /// Error out if there are missing keys or failed to convert to yaml + pub fn new( + pipeline: Pipeline, + input_map: HashMap, + annotation: Option, + ) -> Result { + // Check if input_map has all the requires keys + let missing_keys = pipeline + .get_root_nodes() + .map(|node_id| match pipeline.get_node(node_id)? { + Node::Pod(pod) => Ok(find_missing_keys(&input_map, pod.input_stream.keys())), + Node::Mapper(mapper) => Ok(find_missing_keys(&input_map, mapper.mapping.keys())), + }) + .collect::>>>()? + .into_iter() + .flatten() + .collect::>(); + + if !missing_keys.is_empty() { + return Err(OrcaError { + kind: Kind::MissingStreamKey { + input_map, + missing_keys, + backtrace: Some(Backtrace::capture()), + }, + }); + } + + Ok(Self { + pipeline, + input_map, + annotation, + hash: String::new(), + }) + } +} + +fn find_missing_keys<'a>( + input_map: &HashMap, + keys_to_check: impl Iterator, +) -> Vec { + keys_to_check + .filter_map(|key| { + if input_map.contains_key(key) { + None + } else { + Some(key.clone()) + } + }) + .collect() +} + +/// Helper struct to assist in defining a pipeline in Rust +pub struct PipelineBuilder { + pipeline: Pipeline, +} + +impl Default for PipelineBuilder { + fn default() -> Self { + Self { + pipeline: Pipeline { + hash: String::new(), + annotation: None, + nodes: HashMap::new(), + graph: Graph::new(), + output_nodes: HashSet::new(), + }, + } + } +} + +impl PipelineBuilder { + /// Creates a new `PipelineBuilder` instance. + pub fn new() -> Self { + Self::default() + } + + /// Add nodes to the pipeline and return key to put in edges + /// + /// Cases: + /// 1. If the node is not in the pipeline.nodes, then it is added to the `hash_map` and the key is the node hash + /// 2. If the node is already in the pipeline.nodes, then the key is the hash + _{`num_matches`} to prevent collision + pub fn add_node(&mut self, node: impl Into) -> NodeHandle<'_> { + let node = node.into(); + let hash = node.get_hash(); + + // Get the node_key to add to the edge + let node_key = self.compute_node_key(&hash); + + // Insert into node hash_map if does not exist, else skip + self.pipeline.nodes.entry(node.get_hash()).or_insert(node); + + // Add it to the graph + self.pipeline.graph.add_node(node_key.clone()); + + NodeHandle { + node_key, + pipeline_builder: self, + } + } + + fn add_edge_from_node(&mut self, from: &str, node: impl Into) -> Result { + // Check if node exists in the pipeline.nodes + let node = node.into(); + let hash = node.get_hash(); + + // Get the node_key to add to the graph + let node_key = self.compute_node_key(&hash); + let new_node_idx = self.pipeline.graph.add_node(node_key.clone()); + + // Insert node into the pipeline.nodes lut if it does not exist + self.pipeline.nodes.entry(hash).or_insert(node); + + self.pipeline.graph.add_edge( + self.pipeline + .graph + .node_indices() + .find(|&idx| self.pipeline.graph[idx] == from) + .ok_or(OrcaError { + kind: Kind::NodeNotFound { + parent_node_key: from.to_owned(), + backtrace: Some(Backtrace::capture()), + }, + })?, + new_node_idx, + (), + ); + + Ok(NodeHandle { + node_key, + pipeline_builder: self, + }) + } + + fn compute_node_key(&self, node_hash: &str) -> String { + // Check if node is already in the pipeline, if so then we need to add a numerator to the hash + let num_matches = self + .pipeline + .nodes + .iter() + .filter(|(key, _)| *key == node_hash) + .count(); + + if num_matches > 0 { + // Node already exists, thus we need to add a numerator to the hash + format!("{node_hash}_{num_matches}") + } else { + node_hash.to_owned() + } + } +} + +/// Handle to store the `node_key` for the user to add children to it +pub struct NodeHandle<'a> { + node_key: String, + pipeline_builder: &'a mut PipelineBuilder, +} + +impl NodeHandle<'_> { + /// Add an node as a child to the current `node_key` + /// # Errors + /// Shouldn't error as long the self is in the graph + pub fn add_child(&mut self, node: impl Into) -> Result> { + self.pipeline_builder + .add_edge_from_node(&self.node_key, node) + } +} diff --git a/src/core/pipeline_runner/docker.rs b/src/core/pipeline_runner/docker.rs new file mode 100644 index 0000000..2e71342 --- /dev/null +++ b/src/core/pipeline_runner/docker.rs @@ -0,0 +1,97 @@ +use petgraph::visit::Bfs; +use snafu::OptionExt as _; +use tokio::{ + sync::broadcast::{self, Receiver}, + task::JoinSet, +}; + +use super::PipelineRun; +use crate::{ + core::pipeline::PipelineJob, + uniffi::{ + error::{Result, selector}, + model::Input, + }, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +/// Docker based pipeline runner meant to execute on a single machine +#[derive(Default)] +pub struct DockerPipelineRunner { + pipeline_runs: HashMap, HashMap>>, // For each pipeline run, we have a join set to track the tasks and wait on them +} + +impl DockerPipelineRunner { + /// Create a new Docker pipeline runner + pub fn new() -> Self { + Self::default() + } + + /// # Errors + /// Will error out if the pipeline job fails to start + pub async fn start(&mut self, pipeline_job: PipelineJob) -> Result<()> { + // Create a new pipeline run + let pipeline_run = Arc::new(PipelineRun::new(pipeline_job)); + + self.pipeline_runs + .insert(pipeline_run.clone(), HashMap::new()); + + // Run the pipeline runner + self.start_pipeline_run_task(pipeline_run).await?; + + Ok(()) + } + + /// # Errors + /// Will error out if any of the tasks fails + pub async fn start_pipeline_run_task(&mut self, pipeline_run: Arc) -> Result<()> { + // TODO: Batch implementation + + // Create source channel queue + let (tx, mut rx) = broadcast::channel::>(1); + + let graph = &pipeline_run.pipeline_job.pipeline.graph; + + // Go through the graph from the leaf node and create all the tasks and channels + Ok(()) + } + + fn create_task_for_node( + &mut self, + node_key: String, + pipeline_run: &PipelineRun, + ) -> Receiver> { + // Get parents for the node + pipeline_run + .pipeline_job + .pipeline + .get_parents_key_for_node(node_key) + .map(|parent_node_key| { + // Check if it exists in the pipeline_runs hashmap + match self + .pipeline_runs + .get(pipeline_run) + .unwrap() + .get(&parent_node_key) + { + Some(rx) => rx, + None => { + // Missing parent node, thus recuvrsively create the task for the parent node + self.create_task_for_node(parent_node_key, pipeline_run) + } + } + }) + } + + fn start_node_task_manager( + &mut self, + node_key: String, + input_map: HashMap, + rx: Receiver>, + ) -> Result<()> { + Ok(()) + } +} diff --git a/src/core/pipeline_runner/mod.rs b/src/core/pipeline_runner/mod.rs new file mode 100644 index 0000000..0b9043f --- /dev/null +++ b/src/core/pipeline_runner/mod.rs @@ -0,0 +1,98 @@ +use std::{collections::HashMap, sync::Arc}; + +use tokio::sync::RwLock; + +use crate::uniffi::{error::Result, model::Input}; + +use super::pipeline::PipelineJob; +use std::fmt; +use std::hash::{Hash, Hasher}; + +/// # Errors: +/// Error out if fail to start the pipeline job +pub trait PipelineRunner { + /// Starts the given pipeline job. + /// + /// # Errors + /// Returns an error if the pipeline job fails to start. + fn start(&self, pipeline_job: PipelineJob) -> Result<()>; +} + +#[derive(Debug)] +/// Struct to store the active pipeline run. +/// Currently only store the `node_outputs` as a form a memory cache. +pub struct PipelineRun { + pipeline_job: PipelineJob, + node_outputs: Arc>>>, +} + +impl fmt::Display for PipelineRun { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "PipelineRun {{ pipeline_job: {} }}", + self.pipeline_job.hash + ) + } +} + +impl PipelineRun { + /// New function to initialize the pipeline run + pub fn new(pipeline_job: PipelineJob) -> Self { + Self { + pipeline_job, + node_outputs: Arc::new(RwLock::new(HashMap::new())), + } + } + + async fn process_node( + self: Arc, + node_key: String, + input_map: HashMap, + ) -> Result { + // Get the node from the pipeline + let node = self.pipeline_job.pipeline.get_node(&node_key)?.to_owned(); + // Process the node + let output_map = node.process(&input_map)?; + + // Insert the output map into the pipeline run + self.node_outputs + .write() + .await + .insert(node_key.clone(), output_map); + + Ok(node_key) + } + + /// Find all children that depends on the `node_key` and find out which one can be started + /// by checking if all parents have an output stored in the `node_outputs` + /// returns a `HashMap` of the `node_keys` and their parents + #[expect( + clippy::unwrap_used, + reason = "The iterator already checks for the key before calling unwrap. Should never panic" + )] + async fn get_ready_to_start_children( + &self, + node_key: &str, + ) -> HashMap> { + let node_outputs = self.node_outputs.read().await; + todo!(); + } +} + +impl PartialEq for PipelineRun { + fn eq(&self, other: &Self) -> bool { + self.pipeline_job.hash == other.pipeline_job.hash + } +} + +impl Eq for PipelineRun {} + +impl Hash for PipelineRun { + fn hash(&self, state: &mut H) { + self.pipeline_job.hash.hash(state); + } +} + +/// Docker pipeline runner +pub mod docker; diff --git a/src/core/store/filestore.rs b/src/core/store/filestore.rs index 9f16ac6..3526f59 100644 --- a/src/core/store/filestore.rs +++ b/src/core/store/filestore.rs @@ -34,7 +34,7 @@ static RE_MODEL_METADATA: LazyLock = LazyLock::new(|| { (?[0-9a-f]+)\/ ( annotation\/ - (?[0-9a-zA-Z\-]+) + (?[0-9a-zA-Z\-_ ]+) - (?[0-9]+\.[0-9]+\.[0-9]+) \.yaml diff --git a/src/uniffi/error.rs b/src/uniffi/error.rs index 3ef9daf..a9ba8fd 100644 --- a/src/uniffi/error.rs +++ b/src/uniffi/error.rs @@ -10,11 +10,15 @@ use serde_yaml; use snafu::prelude::Snafu; use std::{ backtrace::Backtrace, + collections::HashMap, io, path::{self, PathBuf}, result, }; +use tokio::task::JoinError; use uniffi; + +use super::model::Input; /// Shorthand for a Result that returns an `OrcaError`. pub type Result = result::Result; /// Possible errors you may encounter. @@ -29,10 +33,25 @@ pub(crate) enum Kind { path: PathBuf, backtrace: Option, }, + #[snafu(display( + "Failed to extract run info from the container image file: {container_name}." + ))] + FailedToExtractRunInfo { + container_name: String, + backtrace: Option, + }, + #[snafu(display( + "Fail to start pod with container_name: {container_name} with error: {source}" + ))] + FailedToStartPod { + container_name: String, + source: BollardError, + backtrace: Option, + }, #[snafu(display("Out of generated random names."))] GeneratedNamesOverflow { backtrace: Option }, #[snafu(display("{source} ({path:?})."))] - InvalidFilepath { + InvalidFileOrDirPath { path: PathBuf, source: io::Error, backtrace: Option, @@ -58,6 +77,11 @@ pub(crate) enum Kind { }, #[snafu(display("No known container names."))] NoContainerNames { backtrace: Option }, + #[snafu(display("Invalid parent node key: {parent_node_key}."))] + NodeNotFound { + parent_node_key: String, + backtrace: Option, + }, #[snafu(display("Missing file or directory name ({path:?})."))] NoFileName { path: PathBuf, @@ -73,6 +97,12 @@ pub(crate) enum Kind { path: PathBuf, backtrace: Option, }, + #[snafu(display("Input map {input_map:?} missing required stream_key {missing_keys:?}"))] + MissingStreamKey { + input_map: HashMap, + missing_keys: Vec, + backtrace: Option, + }, #[snafu(transparent)] BollardError { source: BollardError, @@ -103,6 +133,11 @@ pub(crate) enum Kind { source: serde_yaml::Error, backtrace: Option, }, + #[snafu(transparent)] + TokioJoinError { + source: JoinError, + backtrace: Option, + }, } /// A stable error API interface. #[derive(Snafu, uniffi::Object)] @@ -122,4 +157,8 @@ impl OrcaError { pub const fn is_purged_pod_run(&self) -> bool { matches!(self.kind, Kind::NoMatchingPodRun { .. }) } + /// Returns `true` if the error was caused by an invalid file or directory path. + pub const fn is_failed_to_start_pod(&self) -> bool { + matches!(self.kind, Kind::FailedToStartPod { .. }) + } } diff --git a/src/uniffi/model.rs b/src/uniffi/model.rs index b2aeb24..3e73181 100644 --- a/src/uniffi/model.rs +++ b/src/uniffi/model.rs @@ -11,9 +11,11 @@ use crate::{ use derive_more::Display; use getset::CloneGetters; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use std::{backtrace::Backtrace, collections::HashMap, path::PathBuf, sync::Arc}; use uniffi; +use super::error::{Kind, OrcaError}; + /// Available models. #[derive(uniffi::Enum, Debug)] pub enum ModelType { @@ -122,7 +124,7 @@ pub struct PodJob { pub pod: Arc, /// Attached, external input streams. #[serde(serialize_with = "serialize_hashmap")] - pub input_stream: HashMap, + pub input_map: HashMap, /// Attached, external output directory. pub output_dir: OrcaPath, /// Maximum allowable cores in fractional cores for the computation. @@ -145,14 +147,37 @@ impl PodJob { pub fn new( annotation: Option, pod: Arc, - mut input_stream: HashMap, + mut input_map: HashMap, output_dir: OrcaPath, cpu_limit: f32, memory_limit: u64, env_vars: Option>, namespace_lookup: &HashMap, ) -> Result { - input_stream = input_stream + // Check if input_map has all the required stream_keys + let missing_keys = pod + .input_stream + .keys() + .filter_map(|key| { + if input_map.contains_key(key) { + None + } else { + Some(key.to_owned()) + } + }) + .collect::>(); + + if !missing_keys.is_empty() { + return Err(OrcaError { + kind: Kind::MissingStreamKey { + input_map, + missing_keys, + backtrace: Some(Backtrace::capture()), + }, + }); + } + // Hash all the input_map blobs + input_map = input_map .into_iter() .map(|(stream_name, stream_input)| match stream_input { Input::Unary(blob) => Ok(( @@ -174,7 +199,7 @@ impl PodJob { annotation, hash: String::new(), pod, - input_stream, + input_map, output_dir, cpu_limit, memory_limit, @@ -210,6 +235,8 @@ pub struct PodResult { pub created: u64, /// Time in epoch when terminated in seconds. pub terminated: u64, + /// Output logs of container + pub logs: String, } impl PodResult { @@ -225,6 +252,7 @@ impl PodResult { status: Status, created: u64, terminated: u64, + logs: String, ) -> Result { let pod_result_no_hash = Self { annotation, @@ -234,6 +262,7 @@ impl PodResult { status, created, terminated, + logs, }; Ok(Self { hash: hash_buffer(to_yaml(&pod_result_no_hash)?), @@ -309,6 +338,18 @@ pub struct Blob { /// BLOB contents checksum. pub checksum: String, } + +impl Blob { + /// Constructor for Blob class with an empty checksum that will be computed when is used in `PodJob` + pub const fn new(kind: BlobKind, location: OrcaPath) -> Self { + Self { + kind, + location, + checksum: String::new(), + } + } +} + /// File or directory options for BLOBs. #[derive(uniffi::Enum, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)] pub enum BlobKind { @@ -319,6 +360,35 @@ pub enum BlobKind { Directory, } +/// Mapper +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct Mapper { + /// Hash of the Mapper + pub hash: String, + #[serde(serialize_with = "serialize_hashmap")] + /** + Mapping of `input_stream_keys` to `output_stream_keys` of the mapper + */ + pub mapping: HashMap, +} + +impl Mapper { + /// New function for mapping that computes the hash for + /// # Errors + /// Will error if it fails to convert to yaml + pub fn new(mapping: HashMap) -> Result { + let no_hash = Self { + hash: String::new(), + mapping, + }; + + Ok(Self { + hash: hash_buffer(to_yaml(&no_hash)?), + ..no_hash + }) + } +} + // --- utils ---- uniffi::custom_type!(PathBuf, String, { diff --git a/src/uniffi/orchestrator/docker.rs b/src/uniffi/orchestrator/docker.rs index 7a2e75e..6a319c5 100644 --- a/src/uniffi/orchestrator/docker.rs +++ b/src/uniffi/orchestrator/docker.rs @@ -9,12 +9,13 @@ use crate::{ use async_trait; use bollard::{ Docker, - container::{RemoveContainerOptions, StartContainerOptions, WaitContainerOptions}, + container::{LogOutput, LogsOptions, StartContainerOptions, WaitContainerOptions}, image::{CreateImageOptions, ImportImageOptions}, }; +use colored::Colorize as _; use derive_more::Display; use futures_util::stream::{StreamExt as _, TryStreamExt as _}; -use snafu::{OptionExt as _, futures::TryFutureExt as _}; +use snafu::{OptionExt as _, ResultExt as _, futures::TryFutureExt as _}; use std::{collections::HashMap, path::PathBuf, sync::Arc}; use tokio::{fs::File, runtime::Runtime}; use tokio_util::{ @@ -89,7 +90,7 @@ impl Orchestrator for LocalDockerOrchestrator { let location = namespace_lookup[&image_info.namespace].join(&image_info.path); let byte_stream = FramedRead::new( File::open(&location) - .context(selector::InvalidFilepath { path: &location }) + .context(selector::InvalidFileOrDirPath { path: &location }) .await?, BytesCodec::new(), ) @@ -126,7 +127,11 @@ impl Orchestrator for LocalDockerOrchestrator { .await?; self.api .start_container(&assigned_name, None::>) - .await?; + .await + .context(selector::FailedToStartPod { + container_name: assigned_name.clone(), + })?; + Ok(PodRun::new::(pod_job, assigned_name)) } async fn start( @@ -155,7 +160,8 @@ impl Orchestrator for LocalDockerOrchestrator { vec!["org.orcapod=true".to_owned()], )])) .await? - .map(|(assigned_name, run_info)| { + .map(|result| { + let (assigned_name, run_info) = result?; let mut pod: Pod = serde_json::from_str(get(&run_info.labels, "org.orcapod.pod")?)?; pod.annotation = serde_json::from_str(get(&run_info.labels, "org.orcapod.pod.annotation")?)?; @@ -174,16 +180,7 @@ impl Orchestrator for LocalDockerOrchestrator { .collect() } async fn delete(&self, pod_run: &PodRun) -> Result<()> { - self.api - .remove_container( - &pod_run.assigned_name, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - ) - .await?; - Ok(()) + self.delete_container(&pod_run.assigned_name).await } async fn get_info(&self, pod_run: &PodRun) -> Result { let labels = vec![ @@ -194,21 +191,91 @@ impl Orchestrator for LocalDockerOrchestrator { ), format!("org.orcapod.pod_job.hash={}", pod_run.pod_job.hash), ]; + + // Add names to the filters + let container_filters = HashMap::from([ + ("label".to_owned(), labels), + ( + "name".to_owned(), + Vec::from([pod_run.assigned_name.clone()]), + ), + ]); + let (_, run_info) = self - .list_containers(HashMap::from([("label".to_owned(), labels)])) + .list_containers(container_filters) .await? .next() .context(selector::NoMatchingPodRun { pod_job_hash: pod_run.pod_job.hash.clone(), - })?; + })??; Ok(run_info) } async fn get_result(&self, pod_run: &PodRun) -> Result { - self.api + match self + .api .wait_container(&pod_run.assigned_name, None::>) .try_collect::>() - .await?; + .await + { + Ok(_) => {} + Err(error) => println!( + "{}{}", + "Warning: ".bright_yellow(), + error.to_string().bright_cyan() + ), + } let result_info = self.get_info(pod_run).await?; + + let mut std_out = Vec::new(); + let mut std_err = Vec::new(); + + self.api + .logs::( + &pod_run.assigned_name, + Some(LogsOptions { + stdout: true, + stderr: true, + ..Default::default() + }), + ) + .try_collect::>() + .await? + .iter() + .for_each(|log_output| match log_output { + LogOutput::StdOut { message } => { + std_out.extend(message.to_vec()); + } + LogOutput::StdErr { message } => { + std_err.extend(message.to_vec()); + } + LogOutput::StdIn { .. } => todo!(), + LogOutput::Console { .. } => todo!(), + }); + + let mut logs = String::from_utf8_lossy(&std_out).to_string(); + if !std_err.is_empty() { + logs.push_str("\nSTDERR:\n"); + logs.push_str(&String::from_utf8_lossy(&std_err)); + } + + // Check for errors, if exist, attach it to logs + let error = self + .api + .inspect_container(&pod_run.assigned_name, None) + .await? + .state + .context(selector::FailedToExtractRunInfo { + container_name: &pod_run.assigned_name, + })? + .error + .context(selector::FailedToExtractRunInfo { + container_name: &pod_run.assigned_name, + })?; + + if !error.is_empty() { + logs.push_str(&error); + } + PodResult::new( None, Arc::clone(&pod_run.pod_job), @@ -220,6 +287,7 @@ impl Orchestrator for LocalDockerOrchestrator { .context(selector::InvalidPodResultTerminatedDatetime { pod_job_hash: pod_run.pod_job.hash.clone(), })?, + logs, ) } } diff --git a/src/uniffi/orchestrator/mod.rs b/src/uniffi/orchestrator/mod.rs index 8243190..daee7e9 100644 --- a/src/uniffi/orchestrator/mod.rs +++ b/src/uniffi/orchestrator/mod.rs @@ -17,12 +17,16 @@ pub enum ImageKind { /// Status of a particular compute run. #[derive(uniffi::Enum, Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Default)] pub enum Status { + /// Container is created and is pending execution + Queued, /// Run is ongoing. Running, /// Run has completed successfully. Completed, /// Run failed with the provided error code. Failed(i16), + /// Catch all for all undefine behavior + Unknown, /// No status set. #[default] Unset, @@ -52,7 +56,7 @@ pub struct RunInfo { pub memory_limit: u64, } /// Current computation managed by orchestrator. -#[derive(uniffi::Record, Debug)] +#[derive(uniffi::Record, Debug, PartialEq)] pub struct PodRun { /// Original compute request. pub pod_job: Arc, diff --git a/src/your_module/mod.rs b/src/your_module/mod.rs new file mode 100644 index 0000000..87182d8 --- /dev/null +++ b/src/your_module/mod.rs @@ -0,0 +1,45 @@ +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Debug)] +pub struct StreamInfo { + // Fields for StreamInfo +} + +#[derive(Debug)] +pub struct PipelineRun { + node_outputs: RwLock>, + // Other fields for PipelineRun +} + +impl PipelineRun { + pub async fn process_node( + self: Arc, + node_key: String, + input_map: HashMap, + ) -> Result<(), Box> { + // ...existing code... + + // When spawning, clone Arc for each child + for node in children_node_to_start { + let input_map = self + .node_outputs + .read() + .await + .get(&node_key) + .unwrap() + .clone(); + + let self_clone = Arc::clone(&self); + tokio::spawn(async move { + let _ = self_clone.process_node(node, input_map).await; + }); + } + + Ok(()) + } +} + +// Result type alias for convenience +pub type Result = std::result::Result>; \ No newline at end of file diff --git a/tests/crypto.rs b/tests/crypto.rs index cbb1f54..28a28cd 100644 --- a/tests/crypto.rs +++ b/tests/crypto.rs @@ -1,10 +1,21 @@ -#![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] +#![expect( + missing_docs, + clippy::panic_in_result_fn, + clippy::indexing_slicing, + clippy::panic, + reason = "OK in tests." +)] +pub mod fixture; +use fixture::pod_style; use orcapod::{ core::crypto::{hash_buffer, hash_dir, hash_file}, - uniffi::error::Result, + uniffi::{ + error::Result, + model::{Annotation, Blob, BlobKind, Input, OrcaPath, PodJob, StreamInfo}, + }, }; -use std::fs::read; +use std::{collections::HashMap, fs::read, path::PathBuf}; #[test] fn consistent_hash() -> Result<()> { @@ -27,3 +38,61 @@ fn complex_hash() -> Result<()> { ); Ok(()) } + +#[test] +fn nested_dir_hash() -> Result<()> { + let namespace_lookup = HashMap::from([("default".to_owned(), PathBuf::from("./tests"))]); + + let mut pod_style = pod_style()?; + pod_style.input_stream = HashMap::from([( + "nested_dir".into(), + StreamInfo { + path: PathBuf::from("/input"), + match_pattern: r"\/".to_owned(), + }, + )]); + + let pod_job = PodJob::new( + Some(Annotation { + name: "style-transfer".to_owned(), + description: "This is an example pod job.".to_owned(), + version: "0.1.0".to_owned(), + }), + pod_style.into(), + HashMap::from([( + "nested_dir".to_owned(), + Input::Unary(Blob { + kind: BlobKind::Directory, + location: OrcaPath { + namespace: "default".to_owned(), + path: "extra".into(), + }, + checksum: String::new(), + }), + )]), + OrcaPath { + namespace: "default".to_owned(), + path: PathBuf::from("output"), + }, + 0.5, // 500 millicores as frac cores + 2_u64 << 30, // 2GiB in bytes, KiB=<<10, MiB=<<20, GiB=<<30 + Some(HashMap::from([ + ("ZZZ".to_owned(), "PLEASE".to_owned()), + ("AAA".to_owned(), "SORT".to_owned()), + ])), + &namespace_lookup, + )?; + + match &pod_job.input_map["nested_dir"] { + Input::Unary(blob) => { + assert_eq!( + blob.checksum, + hash_dir("./tests/extra")?, + "Checksum didn't match." + ); + } + Input::Collection(_) => panic!("Expected a Unary input."), + } + + Ok(()) +} diff --git a/tests/error.rs b/tests/error.rs index 3c9f82e..996d712 100644 --- a/tests/error.rs +++ b/tests/error.rs @@ -1,12 +1,13 @@ #![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] pub mod fixture; -use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pod_job_style}; +use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pipeline, pod_job_style, pod_style}; use glob::glob; use orcapod::{ - core::crypto::hash_file, + core::{crypto::hash_file, pipeline::PipelineJob}, uniffi::{ error::{OrcaError, Result}, + model::{Annotation, Blob, BlobKind, Input, OrcaPath, PodJob}, orchestrator::{Orchestrator as _, docker::LocalDockerOrchestrator}, }, }; @@ -90,3 +91,79 @@ fn internal_key_missing() { "Did not raise a key missing error." ); } + +#[test] +fn invalid_pod_job_input_map() -> Result<()> { + let pod_job = PodJob::new( + Some(Annotation { + name: "style-transfer".to_owned(), + description: "This is an example pod job.".to_owned(), + version: "0.1.0".to_owned(), + }), + pod_style()?.into(), + HashMap::from([ + ( + "wrong_key".to_owned(), + Input::Unary(Blob { + kind: BlobKind::File, + location: OrcaPath { + namespace: "default".to_owned(), + path: PathBuf::from("styles/mosaic.t7"), + }, + checksum: String::new(), + }), + ), + ( + "base-input".to_owned(), + Input::Collection(vec![ + Blob { + kind: BlobKind::File, + location: OrcaPath { + namespace: "default".to_owned(), + path: PathBuf::from("styles/style1.t7"), + }, + checksum: String::new(), + }, + Blob { + kind: BlobKind::File, + location: OrcaPath { + namespace: "default".to_owned(), + path: PathBuf::from("images/subject.jpeg"), + }, + checksum: String::new(), + }, + ]), + ), + ]), + OrcaPath { + namespace: "default".to_owned(), + path: PathBuf::from("output"), + }, + 0.5, // 500 millicores as frac cores + 2_u64 << 30, // 2GiB in bytes, KiB=<<10, MiB=<<20, GiB=<<30 + Some(HashMap::from([ + ("ZZZ".to_owned(), "PLEASE".to_owned()), + ("AAA".to_owned(), "SORT".to_owned()), + ])), + &NAMESPACE_LOOKUP_READ_ONLY, + ); + + assert!( + pod_job.is_err_and(contains_debug), + "Did not raise a pod job error." + ); + + Ok(()) +} + +#[test] +fn invalid_pipeline_job_input_map() -> Result<()> { + let pipeline_job = PipelineJob::new(pipeline()?, HashMap::new(), None); + + assert!( + pipeline_job.is_err_and(contains_debug), + "Did not raise a pipeline job error." + ); + + Ok(()) +} diff --git a/tests/extra/data/blank.txt b/tests/extra/data/blank.txt new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixture/mod.rs b/tests/fixture/mod.rs index 7ffab24..1f7b2bc 100644 --- a/tests/fixture/mod.rs +++ b/tests/fixture/mod.rs @@ -8,11 +8,16 @@ )] use names::{Generator, Name}; -use orcapod::uniffi::{ - error::Result, - model::{Annotation, Blob, BlobKind, Input, OrcaPath, Pod, PodJob, PodResult, StreamInfo}, - orchestrator::Status, - store::{ModelID, ModelInfo, Store}, +use orcapod::{ + core::pipeline::{Pipeline, PipelineBuilder, PipelineJob}, + uniffi::{ + error::Result, + model::{ + Annotation, Blob, BlobKind, Input, Mapper, OrcaPath, Pod, PodJob, PodResult, StreamInfo, + }, + orchestrator::Status, + store::{ModelID, ModelInfo, Store}, + }, }; use std::{ collections::HashMap, @@ -139,6 +144,7 @@ pub fn pod_result_style( Status::Completed, 1_737_922_307, 1_737_925_907, + String::from("Example logs"), ) } @@ -187,8 +193,93 @@ pub fn container_image_style(binary_location: impl AsRef) -> Result Result { + Pod::new( + Some(Annotation { + name: pod_name.to_owned(), + description: "Pod append it's own name to the end of the file.".to_owned(), + version: "1.0.0".to_owned(), + }), + "alpine:3.14".to_owned(), + format!( + "cp /input/input.txt /output/input.txt && echo \"Touch by Pod: {pod_name}\" >> /output/input.txt" + ), + HashMap::from([( + "input_text_file".to_owned(), + StreamInfo { + path: PathBuf::from("/input/input.txt"), + match_pattern: r".*\.txt".to_owned(), + }, + )]), + PathBuf::from("/output"), + HashMap::from([( + "output_txt_file".to_owned(), + StreamInfo { + path: PathBuf::from("/output/input.txt"), + match_pattern: r".*\.txt".to_owned(), + }, + )]), + "N/A".to_owned(), + 0.25, // 250 millicores as frac cores + 1_u64 << 30, // 1GiB in bytes + None, + ) +} + +pub fn pipeline() -> Result { + // Create a simple pipeline where the functions job is to add append their name into the input file + // Structure: A -> B -> C + + // Create the components of the pipeline + let pod_a = pod_append_name("A")?; + let pod_b = pod_append_name("B")?; + let pod_c = pod_append_name("C")?; + + let file_mapper = Mapper::new(HashMap::from([( + "input_text_file".to_owned(), + "output_txt_file".to_owned(), + )]))?; + + // Use the builder to create the pipeline + let mut pipeline_builder = PipelineBuilder::new(); + // Add the first node then chain the rest + pipeline_builder + .add_node(pod_a) + .add_child(file_mapper.clone())? + .add_child(pod_b)? + .add_child(file_mapper)? + .add_child(pod_c)?; + + // Convert it into the actual pipeline object + // NOTE: Since we didn't set the output_nodes, all the leaf nodes will be the output nodes + Ok(pipeline_builder.into()) +} + +pub fn pipeline_job() -> Result { + // Create a simple pipeline_job + PipelineJob::new( + pipeline()?, + HashMap::from([( + "input_text_file".to_owned(), + Input::Unary(Blob::new( + BlobKind::File, + OrcaPath { + namespace: "default".to_owned(), + path: PathBuf::from("data/input.txt"), + }, + )), + )]), + Some(Annotation { + name: "Pipeline Job".to_owned(), + description: "Example pipeline_job".to_owned(), + version: "1.0.0".to_owned(), + }), + ) +} + +// --- util --- pub struct TestDirs(pub HashMap); impl TestDirs { diff --git a/tests/model.rs b/tests/model.rs index 2cc90f0..0f4b2ad 100644 --- a/tests/model.rs +++ b/tests/model.rs @@ -49,7 +49,7 @@ fn pod_to_yaml() -> Result<()> { fn hash_pod_job() -> Result<()> { assert_eq!( pod_job_style(&NAMESPACE_LOOKUP_READ_ONLY)?.hash, - "b1b332a66917a7f561206ff8359a65066874727c16c57fe9a3b98d95eac7975b", + "fe5e2c22d00bdf6423a005676140e7ecefbf3fb835380cb5faa36b756bab14e3", "Hash didn't match." ); Ok(()) @@ -62,7 +62,7 @@ fn pod_job_to_yaml() -> Result<()> { indoc! {" class: pod_job pod: 7cc9db247fdbe214520140ef610fc6c23a1f1c5a56e0a6868c72ead03f0be968 - input_stream: + input_map: base-input: - kind: File location: @@ -98,7 +98,7 @@ fn pod_job_to_yaml() -> Result<()> { fn hash_pod_result() -> Result<()> { assert_eq!( pod_result_style(&NAMESPACE_LOOKUP_READ_ONLY)?.hash, - "6c79ca22bd6f32e1612357b2a0dd46ca6a8abe07d7d462787ef99897c176a293", + "3be623128267a2e457b0456473db5206c1ec5815229c98df03bfb9f759f74824", "Hash didn't match." ); Ok(()) @@ -110,11 +110,12 @@ fn pod_result_to_yaml() -> Result<()> { to_yaml(&pod_result_style(&NAMESPACE_LOOKUP_READ_ONLY)?)?, indoc! {" class: pod_result - pod_job: b1b332a66917a7f561206ff8359a65066874727c16c57fe9a3b98d95eac7975b + pod_job: fe5e2c22d00bdf6423a005676140e7ecefbf3fb835380cb5faa36b756bab14e3 assigned_name: simple-endeavour status: Completed created: 1737922307 terminated: 1737925907 + logs: Example logs "}, "YAML serialization didn't match." ); diff --git a/tests/orchestrator.rs b/tests/orchestrator.rs index 193a8c8..c1a64ff 100644 --- a/tests/orchestrator.rs +++ b/tests/orchestrator.rs @@ -1,13 +1,23 @@ -#![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] +#![expect( + missing_docs, + clippy::panic_in_result_fn, + clippy::panic, + clippy::unwrap_used, + clippy::indexing_slicing, + reason = "OK in tests." +)] pub mod fixture; +use bollard::image::CreateImageOptions; use fixture::{TestContainerImage, TestDirs, container_image_style, pod_job_style}; +use futures_util::StreamExt as _; use orcapod::uniffi::{ error::Result, model::OrcaPath, orchestrator::{ImageKind, Orchestrator as _, PodRun, Status, docker::LocalDockerOrchestrator}, }; use std::{collections::HashMap, ops::Deref as _, path::PathBuf, sync::Arc}; +use tokio::runtime::Runtime; fn basic_test(start: T) -> Result<()> where @@ -32,6 +42,7 @@ where orchestrator .list_blocking()? .iter() + .filter(|run| **run == pod_run) .map(|run| Ok(orchestrator.get_info_blocking(run)?.command)) .collect::>>()?, vec![expected_command.clone()], @@ -48,6 +59,7 @@ where orchestrator .list_blocking()? .iter() + .filter(|run| **run == pod_run) .map(|run| Ok(orchestrator.get_info_blocking(run)?.command)) .collect::>>()?, vec![expected_command], @@ -63,7 +75,7 @@ where // test delete orchestrator.delete_blocking(&pod_run)?; assert!( - orchestrator.list_blocking()?.is_empty(), + !orchestrator.list_blocking()?.contains(&pod_run), "Unexpected container remains." ); // try getting info of a purged pod run @@ -88,7 +100,7 @@ fn offline_container_image_basic() -> Result<()> { namespace_lookup["default"].join(container_image_relative_location), )?; let mut pod_job = pod_job_style(namespace_lookup)?; - pod_job.env_vars = Some(HashMap::from([("DELAY".to_owned(), "5".to_owned())])); + pod_job.env_vars = Some(HashMap::from([("DELAY".to_owned(), "1".to_owned())])); Ok(( orchestrator.start_with_altimage_blocking( namespace_lookup, @@ -110,7 +122,7 @@ fn remote_container_image_basic() -> Result<()> { pod.command = "sleep 5".to_owned(); pod.input_stream = HashMap::new(); pod_job.pod = Arc::new(pod); - pod_job.input_stream = HashMap::new(); + pod_job.input_map = HashMap::new(); Ok(( orchestrator.start_blocking(namespace_lookup, &pod_job)?, pod_job.pod.command.clone(), @@ -118,3 +130,222 @@ fn remote_container_image_basic() -> Result<()> { )) }) } + +fn execute_wrapper(test_fn: T) -> Result<()> +where + T: Fn(&HashMap, &LocalDockerOrchestrator) -> Result<()>, +{ + let test_dirs = TestDirs::new(&HashMap::from([( + "default".to_owned(), + Some("./tests/extra/data/"), + )]))?; + let namespace_lookup = test_dirs.namespace_lookup(); + let orchestrator = LocalDockerOrchestrator::new()?; + + test_fn(&namespace_lookup, &orchestrator) +} + +#[test] +fn command_parse() -> Result<()> { + execute_wrapper(|namespace_lookup, orchestrator| { + let mut pod_job = pod_job_style(namespace_lookup)?; + + let mut pod = pod_job.pod.deref().clone(); + pod.image = "alpine:3.14".to_owned(); + pod.command = r#"echo 'hi 1' && echo "hi 2""#.to_owned(); + pod.input_stream = HashMap::new(); + pod_job.pod = pod.into(); + + pod_job.input_map = HashMap::new(); + + let pod_run = orchestrator.start_blocking(namespace_lookup, &pod_job)?; + let pod_result = orchestrator.get_result_blocking(&pod_run)?; + + assert_eq!( + pod_result.status, + Status::Completed, + "Pod status is not completed" + ); + + assert_eq!( + pod_result.logs, "hi 1 && echo hi 2\n", + "Logs do not match error" + ); + + orchestrator.delete_blocking(&pod_run)?; + + assert!( + !orchestrator.list_blocking()?.contains(&pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} + +#[test] +/// Expect pod to fail due to bad command, where the expected behavior should auto delete the container and return an error +fn fail_at_start() -> Result<()> { + execute_wrapper(|namespace_lookup, orchestrator| { + let mut pod_job = pod_job_style(namespace_lookup)?; + let mut pod = pod_job.pod.deref().clone(); + pod.image = "alpine:3.14".to_owned(); + pod.command = "python file_does_not_exist.py".to_owned(); + pod_job.pod = pod.into(); + + let container_name = match orchestrator.start_blocking(namespace_lookup, &pod_job) { + Ok(_) => panic!("Pod was launched successfully when it should have failed."), + Err(err) => { + assert!(err.is_failed_to_start_pod()); + err.to_string().split(' ').collect::>()[6].to_owned() + } + }; + + // Make sure the pod has been deleted after failing to start + let list_result = orchestrator + .list_blocking()? + .into_iter() + .filter(|pod_run| pod_run.assigned_name == *container_name) + .collect::>(); + + assert!( + list_result.len() == 1, + "List didn't return just the fail pod." + ); + + let pod_run = list_result.first().unwrap(); + + // Get the pod result and make sure it is in failed state + let pod_result = orchestrator.get_result_blocking(pod_run)?; + + assert_eq!( + pod_result.status, + Status::Failed(127), + "Pod status is not failed" + ); + + assert_eq!( + pod_result.logs, + "failed to create task for container: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: error during container init: exec: \"python\": executable file not found in $PATH: unknown", + "Logs do not match" + ); + + // Clean up the pod + orchestrator.delete_blocking(pod_run)?; + + assert!( + !orchestrator.list_blocking()?.contains(pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} + +#[test] +fn fail_during_execution() -> Result<()> { + execute_wrapper(|namespace_lookup, orchestrator| { + let mut pod_job = pod_job_style(namespace_lookup)?; + let mut pod = pod_job.pod.deref().clone(); + + pod.image = "alpine:3.14".to_owned(); + pod.command = "python file_does_not_exist.py".to_owned(); + + pod.image = "alpine:3.14".to_owned(); + pod.command = r#"bin/sh -c 'echo "hi" && bad_command'"#.to_owned(); + pod.input_stream = HashMap::new(); + pod_job.pod = pod.into(); + pod_job.input_map = HashMap::new(); + + // Start job and wait for completion + let pod_run = orchestrator.start_blocking(namespace_lookup, &pod_job)?; + let pod_result = orchestrator.get_result_blocking(&pod_run)?; + + assert_eq!( + pod_result.status, + Status::Failed(127), + "Should be in failed state" + ); + + assert_eq!( + pod_result.logs, "hi\n\nSTDERR:\nbin/sh: bad_command: not found\n", + "Logs do not match error" + ); + + // Clean up the pod + orchestrator.delete_blocking(&pod_run)?; + + assert!( + !orchestrator.list_blocking()?.contains(&pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} + +#[test] +fn test_queued_status_container() -> Result<()> { + execute_wrapper(|namespace_lookup, orchestrator| { + let mut pod_job = pod_job_style(namespace_lookup)?; + let mut pod = pod_job.pod.deref().clone(); + + pod.image = "alpine:3.14".to_owned(); + pod.command = "python file_does_not_exist.py".to_owned(); + pod.input_stream = HashMap::new(); + pod_job.pod = pod.into(); + pod_job.input_map = HashMap::new(); + + let runtime = Runtime::new()?; + + let image_options = Some(CreateImageOptions { + from_image: pod_job.pod.image.clone(), + ..Default::default() + }); + runtime.block_on( + orchestrator + .api + .create_image(image_options, None, None) + .collect::>(), + ); + + // Start job and wait for completion + let (container_name, options, config) = + LocalDockerOrchestrator::prepare_container_start_inputs( + namespace_lookup, + &pod_job, + pod_job.pod.image.clone(), + )?; + + runtime.block_on(orchestrator.api.create_container(options, config))?; + + // List all containers and check if the queued_container is in the list + let pod_runs = orchestrator + .list_blocking()? + .into_iter() + .filter(|pod_run| pod_run.assigned_name == container_name) + .collect::>(); + + assert!( + pod_runs.len() == 1, + "List didn't return just the queued pod." + ); + + let pod_run = pod_runs.first().unwrap(); + + // Check that the status is queued + assert!( + orchestrator.get_info_blocking(pod_run)?.status == Status::Queued, + "Status is not queued" + ); + + // Clean up container + orchestrator.delete_blocking(pod_run)?; + assert!( + !orchestrator.list_blocking()?.contains(pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} diff --git a/tests/pipeline.rs b/tests/pipeline.rs new file mode 100644 index 0000000..0788441 --- /dev/null +++ b/tests/pipeline.rs @@ -0,0 +1,62 @@ +#![allow( + clippy::missing_docs_in_private_items, + clippy::panic_in_result_fn, + clippy::unwrap_used, + reason = "test code" +)] +//! Tests for pipeline creation functionality. +//! +//! This module contains tests that verify the correct creation of pipelines +//! using the `pipeline` fixture. The tests ensure that the pipeline creation +//! process completes successfully and outputs the expected results. +mod fixture; + +use fixture::{pipeline, pipeline_job}; +use orcapod::core::pipeline_runner::docker::DockerPipelineRunner; +use orcapod::uniffi::error::Result; +use tokio::runtime::Runtime; + +#[test] +fn root_nodes() -> Result<()> { + let pipeline = pipeline()?; + + assert_eq!(pipeline.get_root_nodes().count(), 1); + Ok(()) +} + +#[test] +fn get_leaf_nodes() -> Result<()> { + let pipeline = pipeline()?; + + assert_eq!(pipeline.get_leaf_nodes().count(), 1); + Ok(()) +} + +#[test] +fn get_parents_key_for_node() -> Result<()> { + let pipeline = pipeline()?; + let node_key = pipeline.get_root_nodes().next().unwrap(); + println!("{:?}", pipeline.graph); + println!("node_key: {}", node_key); + + assert_eq!(pipeline.get_parents_key_for_node(node_key).count(), 0); + Ok(()) +} + +#[test] +fn pipeline_job_creation() -> Result<()> { + let pipeline_job = pipeline_job()?; + Ok(()) +} + +/// Pipeline Runner Tests +/// This module contains tests for the pipeline runner functionality. +#[test] +fn pipeline_run() -> Result<()> { + let pipeline_job = pipeline_job()?; + + let mut docker_pipeline_runner = DockerPipelineRunner::new(); + Runtime::new()?.block_on(docker_pipeline_runner.start(pipeline_job))?; + + Ok(()) +}