Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions database.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def run_migrations():
"type": "BOOLEAN DEFAULT 0",
"description": "Option to skip building compiler images in the workflow",
},
{
"table": "staging_runs",
"column": "treeid",
"type": "TEXT",
"description": "Tree ID from KernelCI checkout node",
},
# Add future migrations here
]

Expand Down
4 changes: 4 additions & 0 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class StagingStepType(str, enum.Enum):
API_PIPELINE_UPDATE = "api_pipeline_update"
MONITORING_SETUP = "monitoring_setup"
TRIGGER_RESTART = "trigger_restart"
CHECKOUT_WAIT = "checkout_wait"


class StagingRunStatus(str, enum.Enum):
Expand Down Expand Up @@ -89,6 +90,9 @@ class StagingRun(Base):
# Kernel tree selection (next, mainline, stable)
kernel_tree = Column(String, nullable=True)

# Tree ID from checkout node (KernelCI API)
treeid = Column(String, nullable=True)

# Skip compiler images option
skip_compiler_images = Column(Boolean, nullable=False, default=False)

Expand Down
105 changes: 105 additions & 0 deletions orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from urllib.parse import quote
from sqlalchemy.orm import Session
import aiohttp

from database import SessionLocal
from models import (
Expand Down Expand Up @@ -148,6 +150,13 @@ async def initialize_staging_steps(self, staging_run: StagingRun, db: Session):
{"type": StagingStepType.TRIGGER_RESTART, "order": order_counter + 2},
]
)
order_counter += 3

# Add CHECKOUT_WAIT step as the last step if kernel_tree is set
if staging_run.kernel_tree and staging_run.kernel_tree != "none":
steps.append(
{"type": StagingStepType.CHECKOUT_WAIT, "order": order_counter}
)

for step_config in steps:
step = StagingRunStep(
Expand Down Expand Up @@ -227,6 +236,8 @@ async def process_step(
await self.process_api_pipeline_step(staging_run, step, db)
elif step.step_type == StagingStepType.TRIGGER_RESTART:
await self.process_trigger_restart_step(staging_run, step, db)
elif step.step_type == StagingStepType.CHECKOUT_WAIT:
await self.process_checkout_wait_step(staging_run, step, db)

async def process_github_workflow_step(
self, staging_run: StagingRun, step: StagingRunStep, db: Session
Expand Down Expand Up @@ -480,6 +491,98 @@ async def process_trigger_restart_step(
step.end_time = datetime.utcnow()
db.commit()

async def process_checkout_wait_step(
self, staging_run: StagingRun, step: StagingRunStep, db: Session
):
"""Process checkout wait step - wait for checkout node to appear in KernelCI API"""
if step.status == StagingStepStatus.PENDING:
step.status = StagingStepStatus.RUNNING
step.start_time = datetime.utcnow()
staging_run.current_step = "checkout_wait"
db.commit()

if step.status == StagingStepStatus.RUNNING:
try:
# Check if we've exceeded the 5-minute timeout
current_time = datetime.utcnow()
if step.start_time:
elapsed_time = (current_time - step.start_time).total_seconds()
if elapsed_time > 300: # 5 minutes = 300 seconds
step.status = StagingStepStatus.FAILED
step.end_time = current_time
step.error_message = (
"Timeout: No checkout node found after 5 minutes"
)
db.commit()
return

# Format the created time for the API query
created_time = staging_run.start_time.strftime("%Y-%m-%dT%H:%M:%S")

# Build the API URL
api_url = f"https://staging.kernelci.org:9000/latest/nodes?kind=checkout&created__gt={quote(created_time)}&limit=250"

# Make the API request
async with aiohttp.ClientSession() as session:
async with session.get(api_url) as response:
if response.status != 200:
# API call failed, will retry on next orchestrator cycle
print(
f"API call failed with status {response.status}, will retry"
)
return

data = await response.json()
nodes = data.get("data", [])

# Look for a node with submitter: "service:pipeline" and matching tree
for node in nodes:
if node.get("submitter") == "service:pipeline":
data_obj = node.get("data", {})
kernel_revision = data_obj.get("kernel_revision", {})
tree = kernel_revision.get("tree")

# Check if tree matches the kernel_tree from staging_run
# For "auto" we need to match the actual selected tree
if tree == staging_run.kernel_tree or (
staging_run.kernel_tree == "auto"
and tree in ["next", "mainline", "stable"]
):
# Found matching checkout node!
treeid = node.get("id")
if treeid:
# Store the treeid in StagingRun
staging_run.treeid = treeid

# Mark step as completed
step.status = StagingStepStatus.COMPLETED
step.end_time = current_time
step.details = json.dumps(
{
"treeid": treeid,
"tree": tree,
"node": node,
}
)

db.commit()
print(
f"Found checkout node with treeid: {treeid} for tree: {tree}"
)
return

# No matching node found yet, will retry on next orchestrator cycle
print(
f"No matching checkout node found yet, will retry. Elapsed time: {elapsed_time:.1f}s"
)

except Exception as e:
step.status = StagingStepStatus.FAILED
step.error_message = f"Error while waiting for checkout node: {str(e)}"
step.end_time = datetime.utcnow()
db.commit()
print(f"Error in checkout_wait step: {e}")

async def complete_staging_run(
self, staging_run: StagingRun, db: Session, success: bool
):
Expand Down Expand Up @@ -549,6 +652,8 @@ async def recover_stuck_steps(self, staging_run: StagingRun, db: Session):
timeout_minutes = 10 # Quick git pull
elif step.step_type == StagingStepType.TRIGGER_RESTART:
timeout_minutes = 5 # Quick docker restart
elif step.step_type == StagingStepType.CHECKOUT_WAIT:
timeout_minutes = 5 # Waiting for checkout node

if running_duration.total_seconds() > (timeout_minutes * 60):
print(
Expand Down