Skip to content

imp(orchestrator): atomic operation on node group removal, ability to delete groups manually #454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion crates/orchestrator/src/api/routes/groups.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::api::server::AppState;
use actix_web::{
web::{self, get, Data},
web::{self, delete, get, Data},
HttpResponse, Scope,
};
use alloy::primitives::Address;
Expand Down Expand Up @@ -82,6 +82,26 @@ async fn get_configurations(app_state: Data<AppState>) -> HttpResponse {
}
}

async fn delete_group(group_id: web::Path<String>, app_state: Data<AppState>) -> HttpResponse {
if let Some(node_groups_plugin) = &app_state.node_groups_plugin {
match node_groups_plugin.dissolve_group(&group_id).await {
Ok(()) => HttpResponse::Ok().json(json!({
"success": true,
"message": format!("Group {} successfully deleted", group_id.as_str())
})),
Err(e) => HttpResponse::InternalServerError().json(json!({
"success": false,
"error": format!("Failed to delete group: {}", e)
})),
}
} else {
HttpResponse::ServiceUnavailable().json(json!({
"success": false,
"error": "Node groups plugin is not enabled"
}))
}
}

async fn get_group_logs(group_id: web::Path<String>, app_state: Data<AppState>) -> HttpResponse {
if let Some(node_groups_plugin) = &app_state.node_groups_plugin {
match node_groups_plugin.get_group_by_id(&group_id).await {
Expand Down Expand Up @@ -227,5 +247,6 @@ pub fn groups_routes() -> Scope {
web::scope("/groups")
.route("", get().to(get_groups))
.route("/configs", get().to(get_configurations))
.route("/{group_id}", delete().to(delete_group))
.route("/{group_id}/logs", get().to(get_group_logs))
}
27 changes: 20 additions & 7 deletions crates/orchestrator/src/plugins/node_groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,15 @@ impl NodeGroupsPlugin {
};
debug!("Created new group structure: {:?}", group);

// Use a Redis transaction to atomically create the group
let mut pipe = redis::pipe();
pipe.atomic();

// Store group data
let group_key = Self::get_group_key(&group_id);
let group_data = serde_json::to_string(&group)?;
debug!("Storing group data at key: {}", group_key);
conn.set::<_, _, ()>(&group_key, group_data).await?;
pipe.set(&group_key, group_data);

// Map nodes to group
debug!(
Expand All @@ -321,10 +325,12 @@ impl NodeGroupsPlugin {
group_id
);
for node in &available_nodes {
conn.hset::<_, _, _, ()>(NODE_GROUP_MAP_KEY, node, &group_id)
.await?;
pipe.hset(NODE_GROUP_MAP_KEY, node, &group_id);
}

// Execute all operations atomically
pipe.query_async::<()>(&mut conn).await?;

// Remove used nodes from healthy_nodes
let prev_healthy_count = healthy_nodes.len();
healthy_nodes.retain(|node| !nodes_to_remove.contains(&node.address.to_string()));
Expand Down Expand Up @@ -402,7 +408,7 @@ impl NodeGroupsPlugin {
}
}

async fn dissolve_group(&self, group_id: &str) -> Result<(), Error> {
pub async fn dissolve_group(&self, group_id: &str) -> Result<(), Error> {
debug!("Attempting to dissolve group: {}", group_id);
let mut conn = self.store.client.get_multiplexed_async_connection().await?;

Expand All @@ -413,20 +419,27 @@ impl NodeGroupsPlugin {
let group: NodeGroup = serde_json::from_str(&group_data)?;
debug!("Found group to dissolve: {:?}", group);

// Use a Redis transaction to atomically dissolve the group
let mut pipe = redis::pipe();
pipe.atomic();

// Remove all nodes from the group mapping
debug!("Removing {} nodes from group mapping", group.nodes.len());
for node in &group.nodes {
conn.hdel::<_, _, ()>(NODE_GROUP_MAP_KEY, node).await?;
pipe.hdel(NODE_GROUP_MAP_KEY, node);
}

// Delete group task assignment
let task_key = format!("{}{}", GROUP_TASK_KEY_PREFIX, group_id);
debug!("Deleting group task assignment from key: {}", task_key);
conn.del::<_, ()>(&task_key).await?;
pipe.del(&task_key);

// Delete group
debug!("Deleting group data from key: {}", group_key);
conn.del::<_, ()>(&group_key).await?;
pipe.del(&group_key);

// Execute all operations atomically
pipe.query_async::<()>(&mut conn).await?;

info!(
"Dissolved group {} with {} nodes",
Expand Down
11 changes: 9 additions & 2 deletions crates/worker/src/p2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,18 @@ impl P2PService {
);
}
};

let provider = &context.provider_wallet.provider;
match retry_call(call, 3, provider.clone(), None).await {
Ok(result) => {
Console::success(&format!("Successfully joined compute pool: {}", result));
Console::section("WORKER JOINED COMPUTE POOL");
Console::success(&format!(
"Successfully registered on chain with tx: {}",
result
));
Console::info(
"Status",
"Worker is now part of the compute pool and ready to receive tasks",
);
}
Err(err) => {
error!("Failed to join compute pool: {:?}", err);
Expand Down
10 changes: 6 additions & 4 deletions crates/worker/src/utils/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ pub fn setup_logging(cli: Option<&Cli>) -> Result<(), Box<dyn std::error::Error
.add_directive("hyper_util=warn".parse()?)
.add_directive("bollard=warn".parse()?)
.add_directive("alloy=warn".parse()?)
.add_directive("iroh=warn".parse()?)
.add_directive("iroh_net=warn".parse()?)
.add_directive("iroh_quinn=warn".parse()?)
.add_directive("iroh_base=warn".parse()?)
.add_directive("iroh=error".parse()?)
.add_directive("iroh_net=error".parse()?)
.add_directive("iroh_quinn=error".parse()?)
.add_directive("iroh_base=error".parse()?)
.add_directive("quinn=error".parse()?)
.add_directive("quinn_proto=error".parse()?)
.add_directive("tracing::span=warn".parse()?);

let fmt_layer = fmt::layer()
Expand Down