Skip to content

feat(worker): introduce cleanup cmd to remove folders + containers #470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 192 additions & 9 deletions crates/worker/src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ pub enum Commands {
#[arg(long)]
compute_pool_id: u64,
},

/// Clean up worker containers and task directories
Cleanup {
/// Force cleanup without prompting
#[arg(long, default_value = "false")]
force: bool,

/// Clean up containers only
#[arg(long, default_value = "false")]
containers_only: bool,

/// Clean up directories only
#[arg(long, default_value = "false")]
directories_only: bool,

/// Optional state storage directory overwrite
#[arg(long)]
state_dir_overwrite: Option<String>,
},
}

pub async fn execute_command(
Expand Down Expand Up @@ -219,9 +238,6 @@ pub async fn execute_command(
let version = option_env!("WORKER_VERSION").unwrap_or(env!("CARGO_PKG_VERSION"));
Console::section("🚀 PRIME WORKER INITIALIZATION - beta");
Console::info("Version", version);
/*
Initialize Wallet instances
*/
let provider_wallet_instance =
match Wallet::new(&private_key_provider, Url::parse(rpc_url).unwrap()) {
Ok(wallet) => wallet,
Expand All @@ -240,9 +256,6 @@ pub async fn execute_command(
}
};

/*
Initialize dependencies - services, contracts, operations
*/
let contracts = ContractBuilder::new(provider_wallet_instance.provider())
.with_compute_registry()
.with_ai_token()
Expand Down Expand Up @@ -954,9 +967,6 @@ pub async fn execute_command(
}
};
let state = Arc::new(SystemState::new(None, true, None));
/*
Initialize dependencies - services, contracts, operations
*/

let contracts = ContractBuilder::new(provider_wallet_instance.provider())
.with_compute_registry()
Expand Down Expand Up @@ -1030,6 +1040,179 @@ pub async fn execute_command(
Console::success("Compute node is not registered");
}

Ok(())
}
Commands::Cleanup {
force,
containers_only,
directories_only,
state_dir_overwrite,
} => {
use crate::docker::docker_manager::DockerManager;
use crate::state::system_state::SystemState;

Console::section("🧹 PRIME WORKER CLEANUP");

// Load system state to get storage path configuration
let _state = Arc::new(SystemState::new(
state_dir_overwrite.clone(),
false, // Don't disable state storing for cleanup
None,
));

// For now, we'll ask the docker manager to cleanup what it can find
// The storage path will be determined by examining existing containers and volumes
let storage_path: Option<String> = None;

// Use default storage path when none is provided
let effective_storage_path = match &storage_path {
Some(path) => path.clone(),
None => {
// Default to user home directory or current directory
const APP_DIR_NAME: &str = "prime-worker";
if let Ok(home) = std::env::var("HOME") {
Copy link
Preview

Copilot AI Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Default storage path logic is duplicated here and in CleanupManager::get_default_storage_path. Consider extracting this into a shared helper to reduce duplication.

Copilot uses AI. Check for mistakes.

format!("{}/{}", home, APP_DIR_NAME)
} else {
std::env::current_dir()
.map(|p| {
p.join(format!("{}-data", APP_DIR_NAME))
.to_string_lossy()
.to_string()
})
.unwrap_or_else(|_| format!("./{}-data", APP_DIR_NAME))
}
}
};

let docker_manager = match DockerManager::new(effective_storage_path) {
Ok(manager) => manager,
Err(e) => {
Console::user_error(&format!("❌ Failed to initialize Docker manager: {}", e));
std::process::exit(1);
}
};

// Show what will be cleaned up
let will_clean_containers = !directories_only;
let will_clean_directories = !containers_only;

if !force {
Console::warning("This will clean up:");
if will_clean_containers {
Console::info("", "• All worker containers (prime-task-*)");
Console::info("", "• Associated Docker volumes");
}
if will_clean_directories {
Console::info("", "• Task directories");
if let Some(path) = &storage_path {
Console::info("", &format!("• Storage path: {}", path));
}
}

print!("\nDo you want to continue? [y/N]: ");
use std::io::{self, Write};
io::stdout().flush().unwrap();

let mut input = String::new();
io::stdin().read_line(&mut input).unwrap();
let input = input.trim().to_lowercase();

if input != "y" && input != "yes" {
Console::info("", "Cleanup cancelled.");
return Ok(());
}
}

let mut cleanup_results = Vec::new();

// Clean up containers
if will_clean_containers {
Console::title("🐳 Cleaning up containers");
match docker_manager.list_prime_containers().await {
Ok(containers) => {
if containers.is_empty() {
Console::info("", "No prime worker containers found");
} else {
Console::info(
"",
&format!("Found {} prime worker containers", containers.len()),
);
for container in containers {
Console::info(
"",
&format!("Removing container: {}", container.names.join(", ")),
);
match docker_manager.remove_container(&container.id).await {
Ok(_) => {
Console::success(&format!(
"✅ Removed container: {}",
container.id
));
cleanup_results
.push(format!("Container {} removed", container.id));
}
Err(e) => {
Console::user_error(&format!(
"❌ Failed to remove container {}: {}",
container.id, e
));
cleanup_results.push(format!(
"Failed to remove container {}: {}",
container.id, e
));
}
}
}
}
}
Err(e) => {
Console::user_error(&format!("❌ Failed to list containers: {}", e));
cleanup_results.push(format!("Failed to list containers: {}", e));
}
}
}

// Clean up directories
if will_clean_directories {
Console::title("📁 Cleaning up directories");
if let Some(_storage_path) = &storage_path {
match docker_manager.cleanup_task_directories().await {
Ok(cleaned_dirs) => {
if cleaned_dirs.is_empty() {
Console::info("", "No task directories found to clean");
} else {
Console::success(&format!(
"✅ Cleaned {} task directories",
cleaned_dirs.len()
));
for dir in cleaned_dirs {
cleanup_results.push(format!("Directory {} cleaned", dir));
}
}
}
Err(e) => {
Console::user_error(&format!("❌ Failed to clean directories: {}", e));
cleanup_results.push(format!("Failed to clean directories: {}", e));
}
}
} else {
Console::warning("No storage path configured - skipping directory cleanup");
cleanup_results
.push("No storage path configured for directory cleanup".to_string());
}
}

// Summary
Console::section("📋 Cleanup Summary");
if cleanup_results.is_empty() {
Console::info("", "No cleanup actions were performed");
} else {
for result in cleanup_results {
Console::info("", &format!("• {}", result));
}
Console::success("🎉 Cleanup completed");
}

Ok(())
}
}
Expand Down
121 changes: 121 additions & 0 deletions crates/worker/src/docker/docker_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,4 +809,125 @@ impl DockerManager {
debug!("Successfully retrieved logs for container {}", container_id);
Ok(logs)
}

/// List all containers with names matching the prime worker pattern
pub async fn list_prime_containers(&self) -> Result<Vec<ContainerInfo>, DockerError> {
debug!("Listing prime worker containers");
let containers = self.list_containers(true).await?;

// Filter containers that match the prime worker naming pattern
let prime_containers: Vec<ContainerInfo> = containers
.into_iter()
.filter(|container| {
container.names.iter().any(|name| {
let trimmed_name = name.trim_start_matches('/');
trimmed_name.starts_with("prime-task-")
})
})
.collect();

debug!("Found {} prime worker containers", prime_containers.len());
Ok(prime_containers)
}

/// Clean up task directories in common locations
pub async fn cleanup_task_directories(&self) -> Result<Vec<String>, DockerError> {
debug!("Cleaning up task directories");
let mut cleaned_dirs = Vec::new();

// Directories to search for task directories
let mut search_paths = Vec::new();

// Add configured storage path
search_paths.push(std::path::PathBuf::from(&self.storage_path));

// Add common locations where task directories might be created
if let Ok(current_dir) = std::env::current_dir() {
search_paths.push(current_dir);
}

// Also check /tmp for temporary task directories
search_paths.push(std::path::PathBuf::from("/tmp"));

// Check home directory
if let Some(home_dir) = std::env::var_os("HOME") {
search_paths.push(std::path::PathBuf::from(home_dir));
}

for storage_path in search_paths {
if !storage_path.exists() {
debug!("Search path does not exist: {}", storage_path.display());
continue;
}

debug!(
"Searching for task directories in: {}",
storage_path.display()
);

// Read the directory and find prime task directories
let entries = match std::fs::read_dir(&storage_path) {
Ok(entries) => entries,
Err(e) => {
debug!("Failed to read directory {}: {}", storage_path.display(), e);
continue;
}
};

for entry in entries {
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
debug!("Failed to read directory entry: {}", e);
continue;
}
};

let path = entry.path();
if path.is_dir() {
if let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) {
// Check if this is a prime task directory
if dir_name.starts_with("prime-task-") || dir_name == "shared" {
debug!("Found task directory: {}", path.display());

// Attempt to remove the directory
match std::fs::remove_dir_all(&path) {
Ok(_) => {
info!("Successfully removed directory: {}", path.display());
cleaned_dirs.push(path.display().to_string());
}
Err(e) => {
debug!("Failed to remove directory {}: {}", path.display(), e);

// Try fallback with rm -rf
match std::process::Command::new("rm")
.arg("-rf")
.arg(&path)
.status()
{
Ok(status) if status.success() => {
info!(
"Fallback removal of {} succeeded",
path.display()
);
cleaned_dirs.push(path.display().to_string());
}
Ok(status) => {
debug!("Fallback rm -rf failed with status {}", status);
}
Err(e) => {
debug!("Failed to execute fallback rm -rf: {}", e);
}
}
}
}
}
}
}
}
}

info!("Cleaned up {} task directories", cleaned_dirs.len());
Ok(cleaned_dirs)
}
}
8 changes: 8 additions & 0 deletions crates/worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(_) => (),
Err(_) => log::warn!("Timeout waiting for tasks to cleanup"),
}

// Prompt for cleanup after graceful shutdown
use crate::utils::cleanup::CleanupManager;
let cleanup_manager = CleanupManager::new(None);
if !cleanup_manager.prompt_and_cleanup().await {
log::warn!("Cleanup failed or was cancelled");
}

Ok(())
}
Loading