From ff328498f987cc3e08f63fb760bc6ab0b9165f2b Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Tue, 14 Oct 2025 13:27:47 +0300 Subject: [PATCH] Add new staging step, retrieving checkout Signed-off-by: Denys Fedoryshchenko --- database.py | 6 +++ models.py | 4 ++ orchestrator.py | 105 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+) diff --git a/database.py b/database.py index f461bce..8069eb7 100644 --- a/database.py +++ b/database.py @@ -40,6 +40,12 @@ def run_migrations(): "type": "BOOLEAN DEFAULT 0", "description": "Option to skip building compiler images in the workflow", }, + { + "table": "staging_runs", + "column": "treeid", + "type": "TEXT", + "description": "Tree ID from KernelCI checkout node", + }, # Add future migrations here ] diff --git a/models.py b/models.py index 55ef290..c11765e 100644 --- a/models.py +++ b/models.py @@ -43,6 +43,7 @@ class StagingStepType(str, enum.Enum): API_PIPELINE_UPDATE = "api_pipeline_update" MONITORING_SETUP = "monitoring_setup" TRIGGER_RESTART = "trigger_restart" + CHECKOUT_WAIT = "checkout_wait" class StagingRunStatus(str, enum.Enum): @@ -89,6 +90,9 @@ class StagingRun(Base): # Kernel tree selection (next, mainline, stable) kernel_tree = Column(String, nullable=True) + # Tree ID from checkout node (KernelCI API) + treeid = Column(String, nullable=True) + # Skip compiler images option skip_compiler_images = Column(Boolean, nullable=False, default=False) diff --git a/orchestrator.py b/orchestrator.py index bc1353b..34207c1 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -7,7 +7,9 @@ import json from datetime import datetime, timedelta from typing import Optional, Dict, Any, List +from urllib.parse import quote from sqlalchemy.orm import Session +import aiohttp from database import SessionLocal from models import ( @@ -148,6 +150,13 @@ async def initialize_staging_steps(self, staging_run: StagingRun, db: Session): {"type": StagingStepType.TRIGGER_RESTART, "order": order_counter + 2}, ] ) + order_counter += 3 + + # Add CHECKOUT_WAIT step as the last step if kernel_tree is set + if staging_run.kernel_tree and staging_run.kernel_tree != "none": + steps.append( + {"type": StagingStepType.CHECKOUT_WAIT, "order": order_counter} + ) for step_config in steps: step = StagingRunStep( @@ -227,6 +236,8 @@ async def process_step( await self.process_api_pipeline_step(staging_run, step, db) elif step.step_type == StagingStepType.TRIGGER_RESTART: await self.process_trigger_restart_step(staging_run, step, db) + elif step.step_type == StagingStepType.CHECKOUT_WAIT: + await self.process_checkout_wait_step(staging_run, step, db) async def process_github_workflow_step( self, staging_run: StagingRun, step: StagingRunStep, db: Session @@ -480,6 +491,98 @@ async def process_trigger_restart_step( step.end_time = datetime.utcnow() db.commit() + async def process_checkout_wait_step( + self, staging_run: StagingRun, step: StagingRunStep, db: Session + ): + """Process checkout wait step - wait for checkout node to appear in KernelCI API""" + if step.status == StagingStepStatus.PENDING: + step.status = StagingStepStatus.RUNNING + step.start_time = datetime.utcnow() + staging_run.current_step = "checkout_wait" + db.commit() + + if step.status == StagingStepStatus.RUNNING: + try: + # Check if we've exceeded the 5-minute timeout + current_time = datetime.utcnow() + if step.start_time: + elapsed_time = (current_time - step.start_time).total_seconds() + if elapsed_time > 300: # 5 minutes = 300 seconds + step.status = StagingStepStatus.FAILED + step.end_time = current_time + step.error_message = ( + "Timeout: No checkout node found after 5 minutes" + ) + db.commit() + return + + # Format the created time for the API query + created_time = staging_run.start_time.strftime("%Y-%m-%dT%H:%M:%S") + + # Build the API URL + api_url = f"https://staging.kernelci.org:9000/latest/nodes?kind=checkout&created__gt={quote(created_time)}&limit=250" + + # Make the API request + async with aiohttp.ClientSession() as session: + async with session.get(api_url) as response: + if response.status != 200: + # API call failed, will retry on next orchestrator cycle + print( + f"API call failed with status {response.status}, will retry" + ) + return + + data = await response.json() + nodes = data.get("data", []) + + # Look for a node with submitter: "service:pipeline" and matching tree + for node in nodes: + if node.get("submitter") == "service:pipeline": + data_obj = node.get("data", {}) + kernel_revision = data_obj.get("kernel_revision", {}) + tree = kernel_revision.get("tree") + + # Check if tree matches the kernel_tree from staging_run + # For "auto" we need to match the actual selected tree + if tree == staging_run.kernel_tree or ( + staging_run.kernel_tree == "auto" + and tree in ["next", "mainline", "stable"] + ): + # Found matching checkout node! + treeid = node.get("id") + if treeid: + # Store the treeid in StagingRun + staging_run.treeid = treeid + + # Mark step as completed + step.status = StagingStepStatus.COMPLETED + step.end_time = current_time + step.details = json.dumps( + { + "treeid": treeid, + "tree": tree, + "node": node, + } + ) + + db.commit() + print( + f"Found checkout node with treeid: {treeid} for tree: {tree}" + ) + return + + # No matching node found yet, will retry on next orchestrator cycle + print( + f"No matching checkout node found yet, will retry. Elapsed time: {elapsed_time:.1f}s" + ) + + except Exception as e: + step.status = StagingStepStatus.FAILED + step.error_message = f"Error while waiting for checkout node: {str(e)}" + step.end_time = datetime.utcnow() + db.commit() + print(f"Error in checkout_wait step: {e}") + async def complete_staging_run( self, staging_run: StagingRun, db: Session, success: bool ): @@ -549,6 +652,8 @@ async def recover_stuck_steps(self, staging_run: StagingRun, db: Session): timeout_minutes = 10 # Quick git pull elif step.step_type == StagingStepType.TRIGGER_RESTART: timeout_minutes = 5 # Quick docker restart + elif step.step_type == StagingStepType.CHECKOUT_WAIT: + timeout_minutes = 5 # Waiting for checkout node if running_duration.total_seconds() > (timeout_minutes * 60): print(