diff --git a/crates/worker/src/cli/command.rs b/crates/worker/src/cli/command.rs index ea98e6f3..2ea4000a 100644 --- a/crates/worker/src/cli/command.rs +++ b/crates/worker/src/cli/command.rs @@ -162,6 +162,25 @@ pub enum Commands { #[arg(long)] compute_pool_id: u64, }, + + /// Clean up worker containers and task directories + Cleanup { + /// Force cleanup without prompting + #[arg(long, default_value = "false")] + force: bool, + + /// Clean up containers only + #[arg(long, default_value = "false")] + containers_only: bool, + + /// Clean up directories only + #[arg(long, default_value = "false")] + directories_only: bool, + + /// Optional state storage directory overwrite + #[arg(long)] + state_dir_overwrite: Option, + }, } pub async fn execute_command( @@ -219,9 +238,6 @@ pub async fn execute_command( let version = option_env!("WORKER_VERSION").unwrap_or(env!("CARGO_PKG_VERSION")); Console::section("๐Ÿš€ PRIME WORKER INITIALIZATION - beta"); Console::info("Version", version); - /* - Initialize Wallet instances - */ let provider_wallet_instance = match Wallet::new(&private_key_provider, Url::parse(rpc_url).unwrap()) { Ok(wallet) => wallet, @@ -240,9 +256,6 @@ pub async fn execute_command( } }; - /* - Initialize dependencies - services, contracts, operations - */ let contracts = ContractBuilder::new(provider_wallet_instance.provider()) .with_compute_registry() .with_ai_token() @@ -954,9 +967,6 @@ pub async fn execute_command( } }; let state = Arc::new(SystemState::new(None, true, None)); - /* - Initialize dependencies - services, contracts, operations - */ let contracts = ContractBuilder::new(provider_wallet_instance.provider()) .with_compute_registry() @@ -1030,6 +1040,179 @@ pub async fn execute_command( Console::success("Compute node is not registered"); } + Ok(()) + } + Commands::Cleanup { + force, + containers_only, + directories_only, + state_dir_overwrite, + } => { + use crate::docker::docker_manager::DockerManager; + use crate::state::system_state::SystemState; + + Console::section("๐Ÿงน PRIME WORKER CLEANUP"); + + // Load system state to get storage path configuration + let _state = Arc::new(SystemState::new( + state_dir_overwrite.clone(), + false, // Don't disable state storing for cleanup + None, + )); + + // For now, we'll ask the docker manager to cleanup what it can find + // The storage path will be determined by examining existing containers and volumes + let storage_path: Option = None; + + // Use default storage path when none is provided + let effective_storage_path = match &storage_path { + Some(path) => path.clone(), + None => { + // Default to user home directory or current directory + const APP_DIR_NAME: &str = "prime-worker"; + if let Ok(home) = std::env::var("HOME") { + format!("{}/{}", home, APP_DIR_NAME) + } else { + std::env::current_dir() + .map(|p| { + p.join(format!("{}-data", APP_DIR_NAME)) + .to_string_lossy() + .to_string() + }) + .unwrap_or_else(|_| format!("./{}-data", APP_DIR_NAME)) + } + } + }; + + let docker_manager = match DockerManager::new(effective_storage_path) { + Ok(manager) => manager, + Err(e) => { + Console::user_error(&format!("โŒ Failed to initialize Docker manager: {}", e)); + std::process::exit(1); + } + }; + + // Show what will be cleaned up + let will_clean_containers = !directories_only; + let will_clean_directories = !containers_only; + + if !force { + Console::warning("This will clean up:"); + if will_clean_containers { + Console::info("", "โ€ข All worker containers (prime-task-*)"); + Console::info("", "โ€ข Associated Docker volumes"); + } + if will_clean_directories { + Console::info("", "โ€ข Task directories"); + if let Some(path) = &storage_path { + Console::info("", &format!("โ€ข Storage path: {}", path)); + } + } + + print!("\nDo you want to continue? [y/N]: "); + use std::io::{self, Write}; + io::stdout().flush().unwrap(); + + let mut input = String::new(); + io::stdin().read_line(&mut input).unwrap(); + let input = input.trim().to_lowercase(); + + if input != "y" && input != "yes" { + Console::info("", "Cleanup cancelled."); + return Ok(()); + } + } + + let mut cleanup_results = Vec::new(); + + // Clean up containers + if will_clean_containers { + Console::title("๐Ÿณ Cleaning up containers"); + match docker_manager.list_prime_containers().await { + Ok(containers) => { + if containers.is_empty() { + Console::info("", "No prime worker containers found"); + } else { + Console::info( + "", + &format!("Found {} prime worker containers", containers.len()), + ); + for container in containers { + Console::info( + "", + &format!("Removing container: {}", container.names.join(", ")), + ); + match docker_manager.remove_container(&container.id).await { + Ok(_) => { + Console::success(&format!( + "โœ… Removed container: {}", + container.id + )); + cleanup_results + .push(format!("Container {} removed", container.id)); + } + Err(e) => { + Console::user_error(&format!( + "โŒ Failed to remove container {}: {}", + container.id, e + )); + cleanup_results.push(format!( + "Failed to remove container {}: {}", + container.id, e + )); + } + } + } + } + } + Err(e) => { + Console::user_error(&format!("โŒ Failed to list containers: {}", e)); + cleanup_results.push(format!("Failed to list containers: {}", e)); + } + } + } + + // Clean up directories + if will_clean_directories { + Console::title("๐Ÿ“ Cleaning up directories"); + if let Some(_storage_path) = &storage_path { + match docker_manager.cleanup_task_directories().await { + Ok(cleaned_dirs) => { + if cleaned_dirs.is_empty() { + Console::info("", "No task directories found to clean"); + } else { + Console::success(&format!( + "โœ… Cleaned {} task directories", + cleaned_dirs.len() + )); + for dir in cleaned_dirs { + cleanup_results.push(format!("Directory {} cleaned", dir)); + } + } + } + Err(e) => { + Console::user_error(&format!("โŒ Failed to clean directories: {}", e)); + cleanup_results.push(format!("Failed to clean directories: {}", e)); + } + } + } else { + Console::warning("No storage path configured - skipping directory cleanup"); + cleanup_results + .push("No storage path configured for directory cleanup".to_string()); + } + } + + // Summary + Console::section("๐Ÿ“‹ Cleanup Summary"); + if cleanup_results.is_empty() { + Console::info("", "No cleanup actions were performed"); + } else { + for result in cleanup_results { + Console::info("", &format!("โ€ข {}", result)); + } + Console::success("๐ŸŽ‰ Cleanup completed"); + } + Ok(()) } } diff --git a/crates/worker/src/docker/docker_manager.rs b/crates/worker/src/docker/docker_manager.rs index 1996977b..aaee1666 100644 --- a/crates/worker/src/docker/docker_manager.rs +++ b/crates/worker/src/docker/docker_manager.rs @@ -809,4 +809,125 @@ impl DockerManager { debug!("Successfully retrieved logs for container {}", container_id); Ok(logs) } + + /// List all containers with names matching the prime worker pattern + pub async fn list_prime_containers(&self) -> Result, DockerError> { + debug!("Listing prime worker containers"); + let containers = self.list_containers(true).await?; + + // Filter containers that match the prime worker naming pattern + let prime_containers: Vec = containers + .into_iter() + .filter(|container| { + container.names.iter().any(|name| { + let trimmed_name = name.trim_start_matches('/'); + trimmed_name.starts_with("prime-task-") + }) + }) + .collect(); + + debug!("Found {} prime worker containers", prime_containers.len()); + Ok(prime_containers) + } + + /// Clean up task directories in common locations + pub async fn cleanup_task_directories(&self) -> Result, DockerError> { + debug!("Cleaning up task directories"); + let mut cleaned_dirs = Vec::new(); + + // Directories to search for task directories + let mut search_paths = Vec::new(); + + // Add configured storage path + search_paths.push(std::path::PathBuf::from(&self.storage_path)); + + // Add common locations where task directories might be created + if let Ok(current_dir) = std::env::current_dir() { + search_paths.push(current_dir); + } + + // Also check /tmp for temporary task directories + search_paths.push(std::path::PathBuf::from("/tmp")); + + // Check home directory + if let Some(home_dir) = std::env::var_os("HOME") { + search_paths.push(std::path::PathBuf::from(home_dir)); + } + + for storage_path in search_paths { + if !storage_path.exists() { + debug!("Search path does not exist: {}", storage_path.display()); + continue; + } + + debug!( + "Searching for task directories in: {}", + storage_path.display() + ); + + // Read the directory and find prime task directories + let entries = match std::fs::read_dir(&storage_path) { + Ok(entries) => entries, + Err(e) => { + debug!("Failed to read directory {}: {}", storage_path.display(), e); + continue; + } + }; + + for entry in entries { + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + debug!("Failed to read directory entry: {}", e); + continue; + } + }; + + let path = entry.path(); + if path.is_dir() { + if let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) { + // Check if this is a prime task directory + if dir_name.starts_with("prime-task-") || dir_name == "shared" { + debug!("Found task directory: {}", path.display()); + + // Attempt to remove the directory + match std::fs::remove_dir_all(&path) { + Ok(_) => { + info!("Successfully removed directory: {}", path.display()); + cleaned_dirs.push(path.display().to_string()); + } + Err(e) => { + debug!("Failed to remove directory {}: {}", path.display(), e); + + // Try fallback with rm -rf + match std::process::Command::new("rm") + .arg("-rf") + .arg(&path) + .status() + { + Ok(status) if status.success() => { + info!( + "Fallback removal of {} succeeded", + path.display() + ); + cleaned_dirs.push(path.display().to_string()); + } + Ok(status) => { + debug!("Fallback rm -rf failed with status {}", status); + } + Err(e) => { + debug!("Failed to execute fallback rm -rf: {}", e); + } + } + } + } + } + } + } + } + } + + info!("Cleaned up {} task directories", cleaned_dirs.len()); + Ok(cleaned_dirs) + } } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index c8e53fe6..d8472782 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -110,5 +110,13 @@ async fn main() -> Result<(), Box> { Ok(_) => (), Err(_) => log::warn!("Timeout waiting for tasks to cleanup"), } + + // Prompt for cleanup after graceful shutdown + use crate::utils::cleanup::CleanupManager; + let cleanup_manager = CleanupManager::new(None); + if !cleanup_manager.prompt_and_cleanup().await { + log::warn!("Cleanup failed or was cancelled"); + } + Ok(()) } diff --git a/crates/worker/src/utils/cleanup.rs b/crates/worker/src/utils/cleanup.rs new file mode 100644 index 00000000..83a5c939 --- /dev/null +++ b/crates/worker/src/utils/cleanup.rs @@ -0,0 +1,198 @@ +use crate::console::Console; +use crate::docker::docker_manager::DockerManager; +use log::{debug, error, info}; +use std::io::{self, Write}; + +pub struct CleanupManager { + docker_manager: Option, +} + +impl CleanupManager { + pub fn new(storage_path: Option) -> Self { + let docker_manager = match storage_path { + Some(path) => match DockerManager::new(path) { + Ok(manager) => Some(manager), + Err(e) => { + error!("Failed to initialize Docker manager for cleanup: {}", e); + None + } + }, + None => { + // Use a default storage path when none is provided + let default_path = Self::get_default_storage_path(); + match DockerManager::new(default_path) { + Ok(manager) => Some(manager), + Err(e) => { + error!("Failed to initialize Docker manager for cleanup: {}", e); + None + } + } + } + }; + + Self { docker_manager } + } + + fn get_default_storage_path() -> String { + const APP_DIR_NAME: &str = "prime-worker"; + + // Try user home directory first + if let Ok(home) = std::env::var("HOME") { + return format!("{}/{}", home, APP_DIR_NAME); + } + + // Fallback to current directory + std::env::current_dir() + .map(|p| { + p.join(format!("{}-data", APP_DIR_NAME)) + .to_string_lossy() + .to_string() + }) + .unwrap_or_else(|_| format!("./{}-data", APP_DIR_NAME)) + } + + /// Prompt user for cleanup and perform it if confirmed + pub async fn prompt_and_cleanup(&self) -> bool { + if self.docker_manager.is_none() { + debug!("Docker manager not available, skipping cleanup"); + return false; + } + + let docker_manager = self.docker_manager.as_ref().unwrap(); + + // Check if there's anything to clean up + let containers_result = docker_manager.list_prime_containers().await; + let has_containers = match &containers_result { + Ok(containers) => !containers.is_empty(), + Err(_) => false, + }; + + if !has_containers { + debug!("No prime worker containers found to clean up"); + return true; + } + + // Show what will be cleaned up + Console::section("๐Ÿงน Cleanup Options"); + Console::warning("The following resources were found:"); + + if let Ok(containers) = &containers_result { + Console::info("", &format!("โ€ข {} worker containers", containers.len())); + for container in containers { + Console::info("", &format!(" - {}", container.names.join(", "))); + } + } + Console::info("", "โ€ข Associated Docker volumes"); + Console::info("", "โ€ข Task directories"); + + print!("\nDo you want to clean up these resources? [y/N]: "); + io::stdout().flush().unwrap(); + + let mut input = String::new(); + if io::stdin().read_line(&mut input).is_err() { + error!("Failed to read user input"); + return false; + } + + let input = input.trim().to_lowercase(); + if input != "y" && input != "yes" { + Console::info("", "Cleanup skipped."); + return true; + } + + // Perform cleanup + Console::title("๐Ÿณ Cleaning up containers"); + if let Ok(containers) = containers_result { + for container in containers { + Console::info( + "", + &format!("Removing container: {}", container.names.join(", ")), + ); + match docker_manager.remove_container(&container.id).await { + Ok(_) => { + Console::success(&format!("โœ… Removed container: {}", container.id)); + } + Err(e) => { + Console::user_error(&format!( + "โŒ Failed to remove container {}: {}", + container.id, e + )); + } + } + } + } + + // Clean up directories + Console::title("๐Ÿ“ Cleaning up directories"); + match docker_manager.cleanup_task_directories().await { + Ok(cleaned_dirs) => { + if cleaned_dirs.is_empty() { + Console::info("", "No task directories found to clean"); + } else { + Console::success(&format!( + "โœ… Cleaned {} task directories", + cleaned_dirs.len() + )); + } + } + Err(e) => { + Console::user_error(&format!("โŒ Failed to clean directories: {}", e)); + } + } + + Console::success("๐ŸŽ‰ Cleanup completed"); + true + } + + /// Perform cleanup without prompting (for --force flag) + #[allow(dead_code)] + pub async fn force_cleanup(&self) -> bool { + if self.docker_manager.is_none() { + debug!("Docker manager not available, skipping cleanup"); + return false; + } + + let docker_manager = self.docker_manager.as_ref().unwrap(); + + Console::section("๐Ÿงน Force Cleanup"); + + // Clean up containers + Console::title("๐Ÿณ Cleaning up containers"); + match docker_manager.list_prime_containers().await { + Ok(containers) => { + if containers.is_empty() { + Console::info("", "No prime worker containers found"); + } else { + for container in containers { + match docker_manager.remove_container(&container.id).await { + Ok(_) => { + info!("Removed container: {}", container.id); + } + Err(e) => { + error!("Failed to remove container {}: {}", container.id, e); + } + } + } + } + } + Err(e) => { + error!("Failed to list containers: {}", e); + } + } + + // Clean up directories + Console::title("๐Ÿ“ Cleaning up directories"); + match docker_manager.cleanup_task_directories().await { + Ok(cleaned_dirs) => { + if !cleaned_dirs.is_empty() { + info!("Cleaned {} task directories", cleaned_dirs.len()); + } + } + Err(e) => { + error!("Failed to clean directories: {}", e); + } + } + + true + } +} diff --git a/crates/worker/src/utils/mod.rs b/crates/worker/src/utils/mod.rs index cf5eb61e..9267bbe1 100644 --- a/crates/worker/src/utils/mod.rs +++ b/crates/worker/src/utils/mod.rs @@ -1,2 +1,3 @@ +pub mod cleanup; pub mod logging; pub mod p2p;