From 93934af529f6562a17d68ee779be9a753f704c7d Mon Sep 17 00:00:00 2001 From: Laura Fitzgerald Date: Mon, 13 Oct 2025 12:38:50 +0100 Subject: [PATCH 1/2] adding rich output for rayjobs - widget and console output --- src/codeflare_sdk/ray/__init__.py | 3 +- src/codeflare_sdk/ray/rayjobs/__init__.py | 4 +- src/codeflare_sdk/ray/rayjobs/pretty_print.py | 181 +++- src/codeflare_sdk/ray/rayjobs/rayjob.py | 789 +++++++++++++++++- src/codeflare_sdk/ray/rayjobs/status.py | 19 + 5 files changed, 990 insertions(+), 6 deletions(-) diff --git a/src/codeflare_sdk/ray/__init__.py b/src/codeflare_sdk/ray/__init__.py index 7bd0b2c8..396b6e1a 100644 --- a/src/codeflare_sdk/ray/__init__.py +++ b/src/codeflare_sdk/ray/__init__.py @@ -10,6 +10,7 @@ RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo, + KueueWorkloadInfo, ) from .cluster import ( @@ -21,4 +22,4 @@ RayClusterStatus, CodeFlareClusterStatus, RayCluster, -) +) \ No newline at end of file diff --git a/src/codeflare_sdk/ray/rayjobs/__init__.py b/src/codeflare_sdk/ray/rayjobs/__init__.py index cd6b4123..628995e3 100644 --- a/src/codeflare_sdk/ray/rayjobs/__init__.py +++ b/src/codeflare_sdk/ray/rayjobs/__init__.py @@ -1,3 +1,3 @@ from .rayjob import RayJob, ManagedClusterConfig -from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo -from .config import ManagedClusterConfig +from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo, KueueWorkloadInfo +from .config import ManagedClusterConfig \ No newline at end of file diff --git a/src/codeflare_sdk/ray/rayjobs/pretty_print.py b/src/codeflare_sdk/ray/rayjobs/pretty_print.py index 34e8dfa1..671d09c6 100644 --- a/src/codeflare_sdk/ray/rayjobs/pretty_print.py +++ b/src/codeflare_sdk/ray/rayjobs/pretty_print.py @@ -20,9 +20,9 @@ from rich.console import Console from rich.table import Table from rich.panel import Panel -from typing import Tuple, Optional +from typing import Tuple, Optional, List -from .status import RayJobDeploymentStatus, RayJobInfo +from .status import RayJobDeploymentStatus, RayJobInfo, KueueWorkloadInfo def print_job_status(job_info: RayJobInfo): @@ -37,6 +37,10 @@ def print_job_status(job_info: RayJobInfo): table.add_row(f"[bold]Status:[/bold] {job_info.status.value}") table.add_row(f"[bold]RayCluster:[/bold] {job_info.cluster_name}") table.add_row(f"[bold]Namespace:[/bold] {job_info.namespace}") + + # Add cluster management info + managed_text = "[bold green]Yes (Job-managed)[/bold green]" if job_info.is_managed_cluster else "[dim]No (Existing cluster)[/dim]" + table.add_row(f"[bold]Managed Cluster:[/bold] {managed_text}") # Add timing information if available if job_info.start_time: @@ -47,6 +51,23 @@ def print_job_status(job_info: RayJobInfo): if job_info.failed_attempts > 0: table.add_row(f"[bold]Failed Attempts:[/bold] {job_info.failed_attempts}") + # Add Kueue information if available + if job_info.kueue_workload: + table.add_row() + table.add_row("[bold blue]🎯 Kueue Integration[/bold blue]") + table.add_row(f"[bold]Local Queue:[/bold] {job_info.local_queue}") + table.add_row(f"[bold]Workload Status:[/bold] [bold green]{job_info.kueue_workload.status}[/bold green]") + table.add_row(f"[bold]Workload Name:[/bold] {job_info.kueue_workload.name}") + if job_info.kueue_workload.priority is not None: + table.add_row(f"[bold]Priority:[/bold] {job_info.kueue_workload.priority}") + if job_info.kueue_workload.admission_time: + table.add_row(f"[bold]Admitted:[/bold] {job_info.kueue_workload.admission_time}") + elif job_info.local_queue: + table.add_row() + table.add_row("[bold blue]🎯 Kueue Integration[/bold blue]") + table.add_row(f"[bold]Local Queue:[/bold] {job_info.local_queue}") + table.add_row("[dim]Workload information not available[/dim]") + _print_table_in_panel(table) @@ -66,6 +87,162 @@ def print_no_job_found(job_name: str, namespace: str): _print_table_in_panel(table) +def print_jobs_list(job_list: List[RayJobInfo], namespace: str, pagination_info: Optional[dict] = None): + """ + Pretty print a list of RayJobs using Rich formatting with pagination support. + + Args: + job_list: List of RayJobInfo objects (for current page) + namespace: Kubernetes namespace + pagination_info: Optional pagination information dict + """ + if not job_list: + # Create table for no jobs found + table = _create_info_table( + "[white on yellow][bold]Namespace", namespace, "[bold yellow]No RayJobs found" + ) + table.add_row() + table.add_row("No RayJobs found in this namespace.") + table.add_row("Jobs may have been deleted or completed with TTL cleanup.") + _print_table_in_panel(table) + return + + # Create main table for job list + console = Console() + + # Create title with pagination info + title = f"[bold blue]πŸš€ RayJobs in namespace: {namespace}[/bold blue]" + if pagination_info and pagination_info["total_pages"] > 1: + title += f" [dim](Page {pagination_info['current_page']} of {pagination_info['total_pages']})[/dim]" + + # Create jobs table with responsive width + jobs_table = Table( + title=title, + show_header=True, + header_style="bold magenta", + border_style="blue", + expand=True, # Allow table to expand to terminal width + min_width=120, # Minimum width for readability + ) + + # Add columns with flexible width allocation and wrapping + jobs_table.add_column("Status", style="bold", min_width=12, max_width=16) + jobs_table.add_column("Job Name", style="bold cyan", min_width=15, max_width=30) + jobs_table.add_column("Job ID", style="dim", min_width=12, max_width=25) + jobs_table.add_column("Cluster", min_width=12, max_width=25) + jobs_table.add_column("Managed", min_width=8, max_width=12) + jobs_table.add_column("Queue", min_width=10, max_width=18) + jobs_table.add_column("Kueue Status", min_width=8, max_width=15) + jobs_table.add_column("Start Time", style="dim", min_width=8, max_width=12) + + # Add rows for each job + for job_info in job_list: + status_display, _ = _get_status_display(job_info.status) + + # Format start time more compactly + if job_info.start_time: + try: + # Extract just time portion and make it compact + start_time = job_info.start_time.split('T')[1][:8] # HH:MM:SS + except (IndexError, AttributeError): + start_time = "N/A" + else: + start_time = "N/A" + + # Truncate long values intelligently (Rich will handle further truncation if needed) + job_name = _truncate_text(job_info.name, 28) + job_id = _truncate_text(job_info.job_id, 23) + cluster_name = _truncate_text(job_info.cluster_name, 23) + + # Cluster management info + managed_display = "βœ… Yes" if job_info.is_managed_cluster else "❌ No" + managed_style = "bold green" if job_info.is_managed_cluster else "dim" + + # Kueue information + queue_display = _truncate_text(job_info.local_queue or "N/A", 16) + kueue_status_display = "N/A" + kueue_status_style = "dim" + + if job_info.kueue_workload: + kueue_status_display = job_info.kueue_workload.status + if job_info.kueue_workload.status == "Admitted": + kueue_status_style = "bold green" + elif job_info.kueue_workload.status == "Pending": + kueue_status_style = "bold yellow" + elif job_info.kueue_workload.status == "Finished": + kueue_status_style = "bold blue" + + jobs_table.add_row( + status_display, + job_name, + job_id, + cluster_name, + f"[{managed_style}]{managed_display}[/{managed_style}]", + queue_display, + f"[{kueue_status_style}]{kueue_status_display}[/{kueue_status_style}]", + start_time, + ) + + # Print the table + console.print(jobs_table) + + # Add pagination information + if pagination_info: + console.print() # Add spacing + if pagination_info["total_pages"] > 1: + console.print( + f"[dim]Showing {pagination_info['showing_start']}-{pagination_info['showing_end']} " + f"of {pagination_info['total_jobs']} jobs " + f"(Page {pagination_info['current_page']} of {pagination_info['total_pages']})[/dim]" + ) + + # Navigation hints + nav_hints = [] + if pagination_info['current_page'] > 1: + nav_hints.append(f"Previous: RayJob.List(page={pagination_info['current_page'] - 1})") + if pagination_info['current_page'] < pagination_info['total_pages']: + nav_hints.append(f"Next: RayJob.List(page={pagination_info['current_page'] + 1})") + + if nav_hints: + console.print(f"[dim]Navigation: {' | '.join(nav_hints)}[/dim]") + + # Add summary information + kueue_jobs = [job for job in job_list if job.local_queue] + managed_jobs = [job for job in job_list if job.is_managed_cluster] + + if kueue_jobs or managed_jobs: + console.print() # Add spacing + if managed_jobs: + total_managed = len(managed_jobs) + total_jobs = pagination_info["total_jobs"] if pagination_info else len(job_list) + console.print(f"[bold green]πŸ—οΈ {total_managed} of {total_jobs} jobs use job-managed clusters[/bold green]") + if kueue_jobs: + total_kueue = len(kueue_jobs) + total_jobs = pagination_info["total_jobs"] if pagination_info else len(job_list) + console.print(f"[bold blue]🎯 {total_kueue} of {total_jobs} jobs are managed by Kueue[/bold blue]") + + +def _truncate_text(text: str, max_length: int) -> str: + """ + Truncate text intelligently with ellipsis if needed. + + Args: + text: Text to truncate + max_length: Maximum length including ellipsis + + Returns: + Truncated text with ellipsis if needed + """ + if len(text) <= max_length: + return text + + # Leave room for ellipsis + if max_length <= 3: + return text[:max_length] + + return text[:max_length-3] + "..." + + def _get_status_display(status: RayJobDeploymentStatus) -> Tuple[str, str]: """ Get the display string and header color for a given status. diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index c06c596e..da57f0f8 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -23,10 +23,12 @@ from typing import Dict, Any, Optional, Tuple, Union from ray.runtime_env import RuntimeEnv +from kubernetes import client + from codeflare_sdk.common.kueue.kueue import get_default_kueue_name from codeflare_sdk.common.utils.constants import MOUNT_PATH - from codeflare_sdk.common.utils.utils import get_ray_image_for_python_version +from codeflare_sdk.common.kubernetes_cluster.auth import get_api_client, config_check from codeflare_sdk.vendored.python_client.kuberay_job_api import RayjobApi from codeflare_sdk.vendored.python_client.kuberay_cluster_api import RayClusterApi from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig @@ -43,13 +45,135 @@ RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo, + KueueWorkloadInfo, ) from . import pretty_print +# Widget imports (optional, for Jupyter notebook support) +try: + import ipywidgets as widgets + from IPython.display import display + WIDGETS_AVAILABLE = True +except ImportError: + WIDGETS_AVAILABLE = False + logger = logging.getLogger(__name__) +def _get_kueue_workload_info(job_name: str, namespace: str, labels: dict) -> Optional[KueueWorkloadInfo]: + """ + Get Kueue workload information for a RayJob if it's managed by Kueue. + + Args: + job_name: Name of the RayJob + namespace: Kubernetes namespace + labels: Labels from the RayJob metadata + + Returns: + KueueWorkloadInfo if the job is managed by Kueue, None otherwise + """ + # Check if job has Kueue queue label + queue_name = labels.get("kueue.x-k8s.io/queue-name") + if not queue_name: + return None + + try: + # Check and load Kubernetes config (handles oc login, kubeconfig, etc.) + config_check() + # List all workloads in the namespace to find the one for this RayJob + api_instance = client.CustomObjectsApi(get_api_client()) + workloads = api_instance.list_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + plural="workloads", + namespace=namespace, + ) + + # Find workload with matching RayJob owner reference + for workload in workloads.get("items", []): + owner_refs = workload.get("metadata", {}).get("ownerReferences", []) + + for owner_ref in owner_refs: + if ( + owner_ref.get("kind") == "RayJob" + and owner_ref.get("name") == job_name + ): + # Found the workload for this RayJob + workload_metadata = workload.get("metadata", {}) + workload_status = workload.get("status", {}) + + # Extract workload information + workload_info = KueueWorkloadInfo( + name=workload_metadata.get("name", "unknown"), + queue_name=queue_name, + status=_get_workload_status_summary(workload_status), + priority=workload.get("spec", {}).get("priority"), + creation_time=workload_metadata.get("creationTimestamp"), + admission_time=_get_admission_time(workload_status), + ) + + logger.debug(f"Found Kueue workload for RayJob {job_name}: {workload_info.name}") + return workload_info + + # No workload found for this RayJob + logger.debug(f"No Kueue workload found for RayJob {job_name}") + return None + + except Exception as e: + logger.warning(f"Failed to get Kueue workload info for RayJob {job_name}: {e}") + return None + + +def _get_workload_status_summary(workload_status: dict) -> str: + """ + Get a summary status from Kueue workload status. + + Args: + workload_status: The status section from a Kueue workload CR + + Returns: + String summary of the workload status + """ + conditions = workload_status.get("conditions", []) + + # Check conditions in priority order + for condition_type in ["Finished", "Admitted", "QuotaReserved", "Pending"]: + for condition in conditions: + if ( + condition.get("type") == condition_type + and condition.get("status") == "True" + ): + return condition_type + + # If no clear status, return "Unknown" + return "Unknown" + + +def _get_admission_time(workload_status: dict) -> Optional[str]: + """ + Get the admission time from Kueue workload status. + + Args: + workload_status: The status section from a Kueue workload CR + + Returns: + Admission timestamp if available, None otherwise + """ + conditions = workload_status.get("conditions", []) + + for condition in conditions: + if ( + condition.get("type") == "Admitted" + and condition.get("status") == "True" + ): + return condition.get("lastTransitionTime") + + return None + + + + class RayJob: """ A client for managing Ray jobs using the KubeRay operator. @@ -57,6 +181,40 @@ class RayJob: This class provides a simplified interface for submitting and managing RayJob CRs (using the KubeRay RayJob python client). """ + + # Global configuration for widget display preference + _default_use_widgets = False + + @classmethod + def set_widgets_default(cls, use_widgets: bool): + """ + Set the global default for widget display in Status() and List() methods. + + Args: + use_widgets (bool): Whether to use Jupyter widgets by default + + Example: + >>> from codeflare_sdk import RayJob + >>> RayJob.set_widgets_default(True) # Enable widgets globally + >>> RayJob.Status("my-job") # Now uses widgets by default + >>> RayJob.List() # Now uses widgets by default + """ + cls._default_use_widgets = use_widgets + logger.info(f"Set global widget default to: {use_widgets}") + + @classmethod + def get_widgets_default(cls) -> bool: + """ + Get the current global default for widget display. + + Returns: + bool: Current global widget setting + + Example: + >>> from codeflare_sdk import RayJob + >>> print(f"Widgets enabled: {RayJob.get_widgets_default()}") + """ + return cls._default_use_widgets def __init__( self, @@ -573,6 +731,27 @@ def status( except ValueError: deployment_status = RayJobDeploymentStatus.UNKNOWN + # Get Kueue workload information if available + # We need to fetch the RayJob CR to get labels + kueue_workload = None + local_queue = None + try: + config_check() + api_instance = client.CustomObjectsApi(get_api_client()) + rayjob_cr = api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=self.namespace, + plural="rayjobs", + name=self.name, + ) + labels = rayjob_cr.get("metadata", {}).get("labels", {}) + local_queue = labels.get("kueue.x-k8s.io/queue-name") + if local_queue: + kueue_workload = _get_kueue_workload_info(self.name, self.namespace, labels) + except Exception as e: + logger.debug(f"Could not fetch Kueue workload info for {self.name}: {e}") + # Create RayJobInfo dataclass job_info = RayJobInfo( name=self.name, @@ -584,6 +763,9 @@ def status( end_time=status_data.get("endTime"), failed_attempts=status_data.get("failed", 0), succeeded_attempts=status_data.get("succeeded", 0), + kueue_workload=kueue_workload, + local_queue=local_queue, + is_managed_cluster=self._cluster_config is not None, ) # Map to CodeFlare status and determine readiness @@ -594,6 +776,611 @@ def status( return codeflare_status, ready + @staticmethod + def Status( + job_name: str, + namespace: Optional[str] = None, + use_widgets: Optional[bool] = None + ): + """ + Get the status of a RayJob by name and display it with Rich console formatting. + + Args: + job_name (str): The name of the RayJob + namespace (Optional[str]): The Kubernetes namespace (auto-detected if not specified) + use_widgets (Optional[bool]): Whether to display in Jupyter widgets (default: None, uses global setting) + + Returns: + Tuple of (CodeflareRayJobStatus, ready: bool) when use_widgets=False (default), None when use_widgets=True + + Example: + >>> from codeflare_sdk.ray import RayJob + >>> status, ready = RayJob.Status("my-job") # Rich console (default) + >>> RayJob.set_widgets_default(True) # Enable widgets globally + >>> RayJob.Status("my-job") # Now uses widgets by default + """ + if namespace is None: + namespace = get_current_namespace() + + # Use global default if not specified + if use_widgets is None: + use_widgets = RayJob._default_use_widgets + + # Initialize the API (no parameters needed) + api = RayjobApi() + + try: + # Check and load Kubernetes config (handles oc login, kubeconfig, etc.) + config_check() + status_data = api.get_job_status(name=job_name, k8s_namespace=namespace) + + if not status_data: + if use_widgets: + RayJob._display_job_status_widget(None, job_name, namespace) + return None # Don't return tuple to avoid Jupyter auto-display + else: + pretty_print.print_no_job_found(job_name, namespace) + return CodeflareRayJobStatus.UNKNOWN, False + + # Map deployment status to our enums + deployment_status_str = status_data.get("jobDeploymentStatus", "Unknown") + + try: + deployment_status = RayJobDeploymentStatus(deployment_status_str) + except ValueError: + deployment_status = RayJobDeploymentStatus.UNKNOWN + + # Create RayJobInfo dataclass - we need to determine cluster_name + cluster_name = status_data.get("rayClusterName", "unknown") + + # Get Kueue workload information and cluster management info + # We need to fetch the RayJob CR to get labels and spec + kueue_workload = None + local_queue = None + is_managed_cluster = False + try: + api_instance = client.CustomObjectsApi(get_api_client()) + rayjob_cr = api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayjobs", + name=job_name, + ) + labels = rayjob_cr.get("metadata", {}).get("labels", {}) + spec = rayjob_cr.get("spec", {}) + + # Determine if this is a managed cluster + is_managed_cluster = "rayClusterSpec" in spec + + local_queue = labels.get("kueue.x-k8s.io/queue-name") + if local_queue: + kueue_workload = _get_kueue_workload_info(job_name, namespace, labels) + except Exception as e: + logger.debug(f"Could not fetch Kueue workload info for {job_name}: {e}") + + job_info = RayJobInfo( + name=job_name, + job_id=status_data.get("jobId", ""), + status=deployment_status, + namespace=namespace, + cluster_name=cluster_name, + start_time=status_data.get("startTime"), + end_time=status_data.get("endTime"), + failed_attempts=status_data.get("failed", 0), + succeeded_attempts=status_data.get("succeeded", 0), + kueue_workload=kueue_workload, + local_queue=local_queue, + is_managed_cluster=is_managed_cluster, + ) + + # Map to CodeFlare status and determine readiness using static method + codeflare_status, ready = RayJob._map_to_codeflare_status_static(deployment_status) + + if use_widgets: + RayJob._display_job_status_widget(job_info, job_name, namespace) + return None # Don't return tuple to avoid Jupyter auto-display + else: + pretty_print.print_job_status(job_info) + return codeflare_status, ready + + except Exception as e: + logger.error(f"Failed to get status for RayJob {job_name}: {e}") + if use_widgets: + RayJob._display_job_status_widget(None, job_name, namespace) + return None # Don't return tuple to avoid Jupyter auto-display + else: + pretty_print.print_no_job_found(job_name, namespace) + return CodeflareRayJobStatus.UNKNOWN, False + + @staticmethod + def List( + namespace: Optional[str] = None, + use_widgets: Optional[bool] = None, + page_size: int = 10, + page: int = 1 + ): + """ + List all RayJobs in a namespace and display them with Rich console formatting. + + Args: + namespace (Optional[str]): The Kubernetes namespace (auto-detected if not specified) + use_widgets (Optional[bool]): Whether to display in Jupyter widgets (default: None, uses global setting) + page_size (int): Number of jobs to display per page (default: 10) + page (int): Page number to display (default: 1) + + Returns: + list: List of RayJobInfo objects when use_widgets=False, None when use_widgets=True + + Example: + >>> from codeflare_sdk.ray import RayJob + >>> jobs = RayJob.List() # First 10 jobs (default) + >>> RayJob.List(page=2) # Next 10 jobs + >>> RayJob.List(page_size=5, page=1) # First 5 jobs + >>> RayJob.set_widgets_default(True) # Enable widgets globally + """ + if namespace is None: + namespace = get_current_namespace() + + # Use global default if not specified + if use_widgets is None: + use_widgets = RayJob._default_use_widgets + + try: + # Check and load Kubernetes config (handles oc login, kubeconfig, etc.) + config_check() + # Use Kubernetes API to list RayJob custom resources + api_instance = client.CustomObjectsApi(get_api_client()) + rayjobs = api_instance.list_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayjobs", + ) + + job_list = [] + for rayjob_cr in rayjobs["items"]: + # Extract job information from the custom resource + metadata = rayjob_cr.get("metadata", {}) + spec = rayjob_cr.get("spec", {}) + status = rayjob_cr.get("status", {}) + + job_name = metadata.get("name", "unknown") + job_id = status.get("jobId", "") + deployment_status_str = status.get("jobDeploymentStatus", "Unknown") + + try: + deployment_status = RayJobDeploymentStatus(deployment_status_str) + except ValueError: + deployment_status = RayJobDeploymentStatus.UNKNOWN + + # Get cluster name and determine if it's managed + cluster_name = "unknown" + is_managed_cluster = False + if "rayClusterSpec" in spec: + cluster_name = f"{job_name}-cluster" # Managed cluster + is_managed_cluster = True + elif "clusterSelector" in spec: + cluster_selector = spec["clusterSelector"] + cluster_name = cluster_selector.get("ray.io/cluster", "unknown") + is_managed_cluster = False + + # Get Kueue workload information if available + labels = metadata.get("labels", {}) + local_queue = labels.get("kueue.x-k8s.io/queue-name") + kueue_workload = None + if local_queue: + kueue_workload = _get_kueue_workload_info(job_name, namespace, labels) + + job_info = RayJobInfo( + name=job_name, + job_id=job_id, + status=deployment_status, + namespace=namespace, + cluster_name=cluster_name, + start_time=status.get("startTime"), + end_time=status.get("endTime"), + failed_attempts=status.get("failed", 0), + succeeded_attempts=status.get("succeeded", 0), + kueue_workload=kueue_workload, + local_queue=local_queue, + is_managed_cluster=is_managed_cluster, + ) + job_list.append(job_info) + + # Apply pagination + total_jobs = len(job_list) + total_pages = (total_jobs + page_size - 1) // page_size # Ceiling division + + # Validate page number + if page < 1: + page = 1 + elif page > total_pages and total_pages > 0: + page = total_pages + + # Calculate pagination slice + start_idx = (page - 1) * page_size + end_idx = start_idx + page_size + paginated_jobs = job_list[start_idx:end_idx] + + # Create pagination info + pagination_info = { + "current_page": page, + "total_pages": total_pages, + "page_size": page_size, + "total_jobs": total_jobs, + "showing_start": start_idx + 1 if paginated_jobs else 0, + "showing_end": min(end_idx, total_jobs), + } + + if use_widgets: + RayJob._display_jobs_list_widget(paginated_jobs, namespace, pagination_info) + return None # Don't return job_list to avoid Jupyter auto-display + else: + pretty_print.print_jobs_list(paginated_jobs, namespace, pagination_info) + return job_list # Return full list for programmatic use + + except Exception as e: + logger.error(f"Failed to list RayJobs in namespace {namespace}: {e}") + empty_pagination_info = { + "current_page": 1, + "total_pages": 0, + "page_size": page_size, + "total_jobs": 0, + "showing_start": 0, + "showing_end": 0, + } + if use_widgets: + RayJob._display_jobs_list_widget([], namespace, empty_pagination_info) + return None # Don't return empty list to avoid Jupyter auto-display + else: + pretty_print.print_jobs_list([], namespace, empty_pagination_info) + return [] + + @staticmethod + def _map_to_codeflare_status_static( + deployment_status: RayJobDeploymentStatus, + ) -> Tuple[CodeflareRayJobStatus, bool]: + """ + Static version of status mapping for use by the static status method. + """ + status_mapping = { + RayJobDeploymentStatus.COMPLETE: (CodeflareRayJobStatus.COMPLETE, True), + RayJobDeploymentStatus.FAILED: (CodeflareRayJobStatus.FAILED, True), + RayJobDeploymentStatus.RUNNING: (CodeflareRayJobStatus.RUNNING, False), + RayJobDeploymentStatus.SUSPENDED: (CodeflareRayJobStatus.SUSPENDED, False), + RayJobDeploymentStatus.UNKNOWN: (CodeflareRayJobStatus.UNKNOWN, False), + } + + return status_mapping.get( + deployment_status, (CodeflareRayJobStatus.UNKNOWN, False) + ) + + @staticmethod + def _display_job_status_widget(job_info: Optional[RayJobInfo], job_name: str, namespace: str): + """ + Display RayJob status in a Jupyter widget. + + Args: + job_info: RayJobInfo object or None if job not found + job_name: Name of the job + namespace: Kubernetes namespace + """ + if not WIDGETS_AVAILABLE: + # Fall back to console output if widgets not available + if job_info: + pretty_print.print_job_status(job_info) + else: + pretty_print.print_no_job_found(job_name, namespace) + return + + # Create the widget display + if job_info is None: + # Job not found - match the list widget styling + status_widget = widgets.HTML( + value=f""" +
+

❌ RayJob Not Found

+ + + + + + + + + + + + + +
Job Name:{job_name}
Namespace:{namespace}
Status:The RayJob may have been deleted or never existed.
+
+ """ + ) + else: + # Job found - create status display matching list widget style + status_color, status_icon = RayJob._get_status_color_and_icon(job_info.status) + + # Build main job information table + job_table_rows = f""" + + Name: + {job_info.name} + + + Job ID: + {job_info.job_id} + + + Status: + + {status_icon} {job_info.status.value} + + + + RayCluster: + {job_info.cluster_name} + + + Namespace: + {job_info.namespace} + + + Managed Cluster: + + {'βœ… Yes (Job-managed)' if job_info.is_managed_cluster else '❌ No (Existing cluster)'} + + + """ + + # Add timing information if available + if job_info.start_time: + job_table_rows += f""" + + Started: + {job_info.start_time} + + """ + + if job_info.end_time: + job_table_rows += f""" + + Ended: + {job_info.end_time} + + """ + + # Add failure info if available + if job_info.failed_attempts > 0: + job_table_rows += f""" + + Failed Attempts: + {job_info.failed_attempts} + + """ + + # Build Kueue section + kueue_section = "" + if job_info.kueue_workload: + kueue_color = "#007bff" # Blue for Kueue + if job_info.kueue_workload.status == "Admitted": + kueue_color = "#28a745" # Green for admitted + elif job_info.kueue_workload.status == "Pending": + kueue_color = "#ffc107" # Yellow for pending + + kueue_rows = f""" + + 🎯 Local Queue: + {job_info.local_queue} + + + 🎯 Workload Status: + + {job_info.kueue_workload.status} + + + + 🎯 Workload Name: + {job_info.kueue_workload.name} + + """ + + if job_info.kueue_workload.priority is not None: + kueue_rows += f""" + + 🎯 Priority: + {job_info.kueue_workload.priority} + + """ + + if job_info.kueue_workload.admission_time: + kueue_rows += f""" + + 🎯 Admitted: + {job_info.kueue_workload.admission_time} + + """ + + kueue_section = kueue_rows + + elif job_info.local_queue: + kueue_section = f""" + + 🎯 Local Queue: + {job_info.local_queue} + + + 🎯 Workload Info: + Not available + + """ + + status_widget = widgets.HTML( + value=f""" +
+

{status_icon} RayJob Status

+ + {job_table_rows} + {kueue_section} +
+
+ """ + ) + + # Display the widget + display(status_widget) + + @staticmethod + def _get_status_color_and_icon(status: RayJobDeploymentStatus) -> Tuple[str, str]: + """ + Get color and icon for job status display. + + Returns: + Tuple of (color, icon) + """ + status_mapping = { + RayJobDeploymentStatus.COMPLETE: ("#28a745", "βœ…"), + RayJobDeploymentStatus.RUNNING: ("#007bff", "πŸ”„"), + RayJobDeploymentStatus.FAILED: ("#dc3545", "❌"), + RayJobDeploymentStatus.SUSPENDED: ("#ffc107", "⏸️"), + RayJobDeploymentStatus.UNKNOWN: ("#6c757d", "❓"), + } + return status_mapping.get(status, ("#6c757d", "❓")) + + @staticmethod + def _display_jobs_list_widget(job_list, namespace: str, pagination_info: Optional[dict] = None): + """ + Display a list of RayJobs in a Jupyter widget table with pagination. + + Args: + job_list: List of RayJobInfo objects (for current page) + namespace: Kubernetes namespace + pagination_info: Optional pagination information dict + """ + if not WIDGETS_AVAILABLE: + # Fall back to console output if widgets not available + pretty_print.print_jobs_list(job_list, namespace, pagination_info) + return + + if not job_list: + # No jobs found + no_jobs_widget = widgets.HTML( + value=f""" +
+

πŸ“‹ No RayJobs Found

+

Namespace: {namespace}

+

No RayJobs found in this namespace. Jobs may have been deleted or completed with TTL cleanup.

+
+ """ + ) + display(no_jobs_widget) + return + + # Create table header with pagination info + title = f"πŸ“‹ RayJobs in namespace: {namespace}" + if pagination_info and pagination_info["total_pages"] > 1: + title += f" (Page {pagination_info['current_page']} of {pagination_info['total_pages']})" + + table_html = f""" +
+

{title}

+ + + + + + + + + + + + + + + """ + + # Add rows for each job + for job_info in job_list: + status_color, status_icon = RayJob._get_status_color_and_icon(job_info.status) + start_time = job_info.start_time or "N/A" + + # Cluster management info + managed_display = "βœ… Yes" if job_info.is_managed_cluster else "❌ No" + managed_color = "#28a745" if job_info.is_managed_cluster else "#6c757d" + + # Kueue information + queue_display = job_info.local_queue or "N/A" + kueue_status_display = "N/A" + kueue_status_color = "#6c757d" + + if job_info.kueue_workload: + kueue_status_display = job_info.kueue_workload.status + if job_info.kueue_workload.status == "Admitted": + kueue_status_color = "#28a745" # Green + elif job_info.kueue_workload.status == "Pending": + kueue_status_color = "#ffc107" # Yellow + elif job_info.kueue_workload.status == "Finished": + kueue_status_color = "#007bff" # Blue + + table_html += f""" + + + + + + + + + + + """ + + table_html += """ + +
StatusJob NameJob IDClusterManagedQueueKueue StatusStart Time
+ + {status_icon} {job_info.status.value} + + {job_info.name}{job_info.job_id}{job_info.cluster_name} + {managed_display} + {queue_display} + {kueue_status_display} + {start_time}
+ """ + + # Add pagination navigation info + if pagination_info and pagination_info["total_pages"] > 1: + table_html += f""" +
+

+ Showing {pagination_info['showing_start']}-{pagination_info['showing_end']} + of {pagination_info['total_jobs']} jobs +

+

+ Navigation: + """ + + if pagination_info['current_page'] > 1: + table_html += f" RayJob.List(page={pagination_info['current_page'] - 1}) [Previous]" + if pagination_info['current_page'] < pagination_info['total_pages']: + if pagination_info['current_page'] > 1: + table_html += " |" + table_html += f" RayJob.List(page={pagination_info['current_page'] + 1}) [Next]" + + table_html += """ +

+
+ """ + + table_html += "
" + + # Display the widget + jobs_widget = widgets.HTML(value=table_html) + display(jobs_widget) + + def _map_to_codeflare_status( self, deployment_status: RayJobDeploymentStatus ) -> Tuple[CodeflareRayJobStatus, bool]: diff --git a/src/codeflare_sdk/ray/rayjobs/status.py b/src/codeflare_sdk/ray/rayjobs/status.py index 027ed09c..6b44b29c 100644 --- a/src/codeflare_sdk/ray/rayjobs/status.py +++ b/src/codeflare_sdk/ray/rayjobs/status.py @@ -47,6 +47,20 @@ class CodeflareRayJobStatus(Enum): UNKNOWN = 5 +@dataclass +class KueueWorkloadInfo: + """ + For storing information about a Kueue workload associated with a RayJob. + """ + + name: str + queue_name: str + status: str # e.g., "Pending", "Admitted", "Finished" + priority: Optional[int] = None + creation_time: Optional[str] = None + admission_time: Optional[str] = None + + @dataclass class RayJobInfo: """ @@ -62,3 +76,8 @@ class RayJobInfo: end_time: Optional[str] = None failed_attempts: int = 0 succeeded_attempts: int = 0 + # Kueue integration + kueue_workload: Optional[KueueWorkloadInfo] = None + local_queue: Optional[str] = None + # Cluster management + is_managed_cluster: bool = False From be159189478aae4f28aa38d28c86e65ab588e0ee Mon Sep 17 00:00:00 2001 From: Laura Fitzgerald Date: Wed, 22 Oct 2025 09:28:18 +0100 Subject: [PATCH 2/2] pitstop --- src/codeflare_sdk/ray/cluster/cluster.py | 632 +++++++++++++++++- src/codeflare_sdk/ray/cluster/pretty_print.py | 17 + src/codeflare_sdk/ray/cluster/status.py | 28 +- src/codeflare_sdk/ray/rayjobs/status.py | 8 +- 4 files changed, 674 insertions(+), 11 deletions(-) diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py index 8538dba3..04cd7b76 100644 --- a/src/codeflare_sdk/ray/cluster/cluster.py +++ b/src/codeflare_sdk/ray/cluster/cluster.py @@ -19,7 +19,7 @@ """ from time import sleep -from typing import List, Optional, Tuple, Dict +from typing import List as ListType, Optional, Tuple, Dict, Any import copy from ray.job_submission import JobSubmissionClient, JobStatus @@ -44,6 +44,7 @@ RayCluster, RayClusterStatus, ) +from ..rayjobs.status import KueueWorkloadInfo from ..appwrapper import ( AppWrapper, AppWrapperStatus, @@ -62,7 +63,13 @@ from kubernetes import client as k8s_client from kubernetes.client.rest import ApiException -from kubernetes.client.rest import ApiException +# Flag to track if ipywidgets is available +WIDGETS_AVAILABLE = True +try: + import ipywidgets as widgets + from IPython.display import display +except ImportError: + WIDGETS_AVAILABLE = False CF_SDK_FIELD_MANAGER = "codeflare-sdk" @@ -74,6 +81,500 @@ class Cluster: Note that currently, the underlying implementation is a Ray cluster. """ + + # Class variable for global widget preference + _default_use_widgets = False + + @classmethod + def set_widgets_default(cls, use_widgets: bool) -> None: + """Set the default widget display preference for all Cluster methods.""" + cls._default_use_widgets = use_widgets + + @classmethod + def get_widgets_default(cls) -> bool: + """Get the current default widget display preference.""" + return cls._default_use_widgets + + @staticmethod + def Status(cluster_name: str, namespace: str = "default", use_widgets: Optional[bool] = None, return_status: bool = False) -> Optional[Tuple[RayClusterStatus, bool]]: + """ + Get the status of a RayCluster. + + Args: + cluster_name: Name of the RayCluster + namespace: Kubernetes namespace + use_widgets: Whether to use Jupyter widgets for display (overrides global setting) + return_status: Whether to return the status tuple instead of displaying + + Returns: + Optional[Tuple[RayClusterStatus, bool]]: Status and ready state if return_status=True + """ + # Check if Kubernetes config is available + if not config_check(): + return None if not return_status else (RayClusterStatus.UNKNOWN, False) + + # Determine widget usage + use_widgets = use_widgets if use_widgets is not None else Cluster._default_use_widgets + if use_widgets and not WIDGETS_AVAILABLE: + # Widgets not available, falling back to console output + pass + use_widgets = False + + try: + # Get RayCluster + api = client.CustomObjectsApi(get_api_client()) + cluster = api.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=cluster_name + ) + + if not cluster: + if use_widgets: + Cluster._display_cluster_status_widget(None, cluster_name, namespace) + else: + print_no_resources_found() + return None if not return_status else (RayClusterStatus.UNKNOWN, False) + + # Parse cluster info + cluster_info = _map_to_ray_cluster(cluster) + if not cluster_info: + if use_widgets: + Cluster._display_cluster_status_widget(None, cluster_name, namespace) + else: + print_no_resources_found() + return None if not return_status else (RayClusterStatus.UNKNOWN, False) + + # Check if cluster is managed by AppWrapper + if cluster["metadata"].get("ownerReferences"): + for owner in cluster["metadata"]["ownerReferences"]: + if owner["kind"] == "AppWrapper": + cluster_info.is_appwrapper = True + break + + # Get Kueue workload info if cluster is managed by AppWrapper + if cluster_info.is_appwrapper: + workload_info = Cluster._get_cluster_kueue_workload_info(cluster_name, namespace) + if workload_info: + cluster_info.local_queue = workload_info.name + cluster_info.kueue_workload = workload_info + + # Get dashboard URL + cluster_info.dashboard = Cluster._get_cluster_dashboard_url(cluster_name, namespace) + + # Display status + if use_widgets: + Cluster._display_cluster_status_widget(cluster_info) + else: + print_cluster_status(cluster_info) + + # Return status tuple if requested + if return_status: + ready = cluster_info.status == RayClusterStatus.READY + return cluster_info.status, ready + + return None + + except Exception as e: + error_msg = _kube_api_error_handling(e) + if return_status: + return RayClusterStatus.UNKNOWN, False + return None + + @staticmethod + def List(namespace: str = "default", page: int = 1, page_size: int = 10, use_widgets: Optional[bool] = None, return_list: bool = False) -> Optional[ListType[RayCluster]]: + """ + List all RayClusters in the specified namespace with pagination. + + Args: + namespace: Kubernetes namespace + page: Page number (1-based) + page_size: Number of items per page + use_widgets: Whether to use Jupyter widgets for display (overrides global setting) + return_list: Whether to return the list instead of displaying + + Returns: + Optional[List[RayCluster]]: List of RayCluster objects if return_list=True + """ + # Check if Kubernetes config is available + if not config_check(): + return None if not return_list else [] + + # Determine widget usage + use_widgets = use_widgets if use_widgets is not None else Cluster._default_use_widgets + if use_widgets and not WIDGETS_AVAILABLE: + # Widgets not available, falling back to console output + pass + use_widgets = False + + try: + # Get all RayClusters + api = client.CustomObjectsApi(get_api_client()) + clusters = api.list_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters" + ) + + if not clusters or not clusters.get("items"): + if return_list: + return [] + if use_widgets: + Cluster._display_clusters_list_widget([], page, page_size) + else: + print_no_resources_found() + return None + + # Parse cluster info for each cluster + cluster_infos = [] + for cluster in clusters["items"]: + cluster_info = _map_to_ray_cluster(cluster) + if not cluster_info: + continue + + # Check if cluster is managed by AppWrapper + if cluster["metadata"].get("ownerReferences"): + for owner in cluster["metadata"]["ownerReferences"]: + if owner["kind"] == "AppWrapper": + cluster_info.is_appwrapper = True + break + + # Get Kueue workload info if cluster is managed by AppWrapper + if cluster_info.is_appwrapper: + workload_info = Cluster._get_cluster_kueue_workload_info(cluster["metadata"]["name"], namespace) + if workload_info: + cluster_info.local_queue = workload_info.name + cluster_info.kueue_workload = workload_info + + # Get dashboard URL + cluster_info.dashboard = Cluster._get_cluster_dashboard_url(cluster["metadata"]["name"], namespace) + + cluster_infos.append(cluster_info) + + # Sort clusters by name + cluster_infos.sort(key=lambda x: x.name) + + # Calculate pagination + start_idx = (page - 1) * page_size + end_idx = start_idx + page_size + total_pages = (len(cluster_infos) + page_size - 1) // page_size + + # Get clusters for current page + current_clusters = cluster_infos[start_idx:end_idx] + + # Display clusters + if use_widgets: + Cluster._display_clusters_list_widget(current_clusters, page, page_size, total_pages) + else: + print_clusters(current_clusters) + + # Return full list if requested + return cluster_infos if return_list else None + + except Exception as e: + error_msg = _kube_api_error_handling(e) + if return_list: + return [] + return None + + @staticmethod + def _get_cluster_kueue_workload_info(cluster_name: str, namespace: str) -> Optional[KueueWorkloadInfo]: + """Get Kueue workload information for a RayCluster.""" + try: + api = client.CustomObjectsApi(get_api_client()) + workloads = api.list_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=namespace, + plural="workloads" + ) + + for workload in workloads.get("items", []): + if workload["metadata"]["ownerReferences"][0]["name"] == cluster_name: + status = workload.get("status", {}) + admission = status.get("admission", {}) + + # Get error information if cluster failed + error_msg, error_reason = Cluster._get_workload_error_info(workload) + + return KueueWorkloadInfo( + name=workload["metadata"]["name"], + status=status.get("conditions", [{}])[0].get("type", "Unknown"), + priority=workload["spec"].get("priority", 0), + admission_time=Cluster._get_admission_time(admission), + error_message=error_msg, + error_reason=error_reason + ) + + return None + + except Exception as e: + # Silently fail - Kueue may not be installed or workload info unavailable + pass + return None + + @staticmethod + def _get_admission_time(admission: Dict[str, Any]) -> Optional[str]: + """Extract admission time from Kueue workload admission data.""" + if not admission: + return None + return admission.get("podSetAssignments", [{}])[0].get("admissionTime") + + @staticmethod + def _get_workload_error_info(workload: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]: + """Extract error information from a failed Kueue workload.""" + status = workload.get("status", {}) + conditions = status.get("conditions", []) + + for condition in conditions: + if condition.get("type") == "Failed": + return condition.get("message"), condition.get("reason") + + return None, None + + @staticmethod + def _get_cluster_dashboard_url(cluster_name: str, namespace: str) -> str: + """Get the dashboard URL for a RayCluster.""" + # Try HTTPRoute first (RHOAI v3.0+) + dashboard_url = _get_dashboard_url_from_httproute(cluster_name, namespace) + if dashboard_url: + return dashboard_url + + # Fall back to OpenShift Routes or Ingresses + if _is_openshift_cluster(): + try: + api = client.CustomObjectsApi(get_api_client()) + routes = api.list_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=namespace, + plural="routes" + ) + + for route in routes["items"]: + if route["metadata"]["name"] == f"ray-dashboard-{cluster_name}" or route["metadata"]["name"].startswith(f"{cluster_name}-ingress"): + protocol = "https" if route["spec"].get("tls") else "http" + return f"{protocol}://{route['spec']['host']}" + + except Exception as e: + # Silently fail - routes may not be available + pass + + else: + try: + api = client.NetworkingV1Api(get_api_client()) + ingresses = api.list_namespaced_ingress(namespace) + + for ingress in ingresses.items: + if ingress.metadata.name == f"ray-dashboard-{cluster_name}" or ingress.metadata.name.startswith(f"{cluster_name}-ingress"): + protocol = "https" if ingress.metadata.annotations and "route.openshift.io/termination" in ingress.metadata.annotations else "http" + return f"{protocol}://{ingress.spec.rules[0].host}" + + except Exception as e: + # Silently fail - ingresses may not be available + pass + + return "Dashboard not available yet" + + @staticmethod + def _display_cluster_status_widget(cluster_info: Optional[RayCluster], cluster_name: str = None, namespace: str = None) -> None: + """Display cluster status using ipywidgets.""" + if not cluster_info: + # Cluster not found + output = widgets.HTML( + f'
' + f'⚠️ No RayCluster found with name "{cluster_name}" in namespace "{namespace}"' + '
' + ) + display(output) + return + + # Create status badge + status_colors = { + RayClusterStatus.READY: ("#2ecc71", "white"), # Green + RayClusterStatus.STARTING: ("#3498db", "white"), # Blue + RayClusterStatus.FAILED: ("#e74c3c", "white"), # Red + RayClusterStatus.STOPPED: ("#f1c40f", "black") # Yellow + } + status_color = status_colors.get(cluster_info.status, ("#95a5a6", "white")) # Gray for unknown + + # Build HTML table + html = f''' +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ''' + + # Add extended resources if any + if cluster_info.head_extended_resources: + html += '' + + if cluster_info.worker_extended_resources: + html += '' + + # Add Kueue information if available + if cluster_info.kueue_workload: + html += f''' + + + + + + + + + + + + + ''' + + # Add error information for failed clusters + if cluster_info.status == RayClusterStatus.FAILED and cluster_info.kueue_workload.error_message: + html += f''' + + + + + ''' + + html += ''' +
Name:{cluster_info.name}Status: + + {cluster_info.status.value} + +
Namespace:{cluster_info.namespace}Workers:{cluster_info.num_workers}
Head CPU:{cluster_info.head_cpu_requests}/{cluster_info.head_cpu_limits}Head Memory:{cluster_info.head_mem_requests}/{cluster_info.head_mem_limits}
Worker CPU:{cluster_info.worker_cpu_requests}/{cluster_info.worker_cpu_limits}Worker Memory:{cluster_info.worker_mem_requests}/{cluster_info.worker_mem_limits}
Dashboard: + + {cluster_info.dashboard} + +
Head Resources:' + for resource, count in cluster_info.head_extended_resources.items(): + html += f'{resource}: {count}, ' + html = html.rstrip(', ') + '
Worker Resources:' + for resource, count in cluster_info.worker_extended_resources.items(): + html += f'{resource}: {count}, ' + html = html.rstrip(', ') + '
Queue:{cluster_info.local_queue or 'N/A'}Priority:{cluster_info.kueue_workload.priority}
Queue Status:{cluster_info.kueue_workload.status}Admission:{cluster_info.kueue_workload.admission_time or 'N/A'}
Error: + {cluster_info.kueue_workload.error_reason}: {cluster_info.kueue_workload.error_message} +
+
+ ''' + + # Display the widget + output = widgets.HTML(html) + display(output) + + @staticmethod + def _display_clusters_list_widget(clusters: ListType[RayCluster], page: int, page_size: int, total_pages: int = 1) -> None: + """Display clusters list using ipywidgets.""" + if not clusters: + output = widgets.HTML( + '
' + '⚠️ No RayClusters found' + '
' + ) + display(output) + return + + # Create HTML table + html = ''' +
+ + + + + + + + + + + + + ''' + + # Status colors + status_colors = { + RayClusterStatus.READY: ("#2ecc71", "white"), # Green + RayClusterStatus.STARTING: ("#3498db", "white"), # Blue + RayClusterStatus.FAILED: ("#e74c3c", "white"), # Red + RayClusterStatus.STOPPED: ("#f1c40f", "black") # Yellow + } + + # Add rows + for cluster in clusters: + status_color = status_colors.get(cluster.status, ("#95a5a6", "white")) # Gray for unknown + + html += f''' + + + + + + + + + ''' + + html += ''' + +
NameStatusWorkersQueueQueue StatusDashboard
{cluster.name} + + {cluster.status.value} + + {cluster.num_workers}{cluster.local_queue or 'N/A'}{cluster.kueue_workload.status if cluster.kueue_workload else 'N/A'} + + {cluster.dashboard} + +
+ ''' + + # Add pagination info + if total_pages > 1: + html += f''' +
+ Page {page} of {total_pages} + ''' + if page > 1: + html += f' (use page={page - 1} for previous)' + if page < total_pages: + html += f' (page={page + 1} for next)' + html += '
' + + html += '
' + + # Display the widget + output = widgets.HTML(html) + display(output) def __init__(self, config: ClusterConfiguration): """ @@ -951,8 +1452,8 @@ def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]: def _get_ray_clusters( - namespace="default", filter: Optional[List[RayClusterStatus]] = None -) -> List[RayCluster]: + namespace="default", filter: Optional[ListType[RayClusterStatus]] = None +) -> ListType[RayCluster]: list_of_clusters = [] try: config_check() @@ -971,16 +1472,133 @@ def _get_ray_clusters( for rc in rcs["items"]: ray_cluster = _map_to_ray_cluster(rc) if filter and ray_cluster.status in filter: + # Check for AppWrapper ownership + metadata = rc.get("metadata", {}) + owner_refs = metadata.get("ownerReferences", []) + for owner in owner_refs: + if owner.get("kind") == "AppWrapper": + ray_cluster.is_appwrapper = True + break + + # Fetch Kueue workload info for all clusters + workload_info = _get_cluster_kueue_workload_info_func( + metadata.get("name"), namespace + ) + if workload_info: + ray_cluster.local_queue = workload_info.queue_name + ray_cluster.kueue_workload = workload_info + list_of_clusters.append(ray_cluster) else: for rc in rcs["items"]: - list_of_clusters.append(_map_to_ray_cluster(rc)) + ray_cluster = _map_to_ray_cluster(rc) + # Check for AppWrapper ownership + metadata = rc.get("metadata", {}) + owner_refs = metadata.get("ownerReferences", []) + for owner in owner_refs: + if owner.get("kind") == "AppWrapper": + ray_cluster.is_appwrapper = True + break + + # Fetch Kueue workload info for all clusters + workload_info = _get_cluster_kueue_workload_info_func( + metadata.get("name"), namespace + ) + if workload_info: + ray_cluster.local_queue = workload_info.queue_name + ray_cluster.kueue_workload = workload_info + + list_of_clusters.append(ray_cluster) return list_of_clusters +def _get_cluster_kueue_workload_info_func( + cluster_name: str, namespace: str +) -> Optional[KueueWorkloadInfo]: + """ + Get Kueue workload information for a RayCluster. + + This function looks for Kueue workloads that have a RayCluster owner reference + matching the given cluster name. Works for all RayClusters, whether they're + AppWrapper-managed or directly managed by Kueue. + + Args: + cluster_name: Name of the RayCluster + namespace: Kubernetes namespace + + Returns: + KueueWorkloadInfo if found, None otherwise + """ + try: + api = client.CustomObjectsApi(get_api_client()) + workloads = api.list_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=namespace, + plural="workloads", + ) + + for workload in workloads.get("items", []): + owner_refs = workload.get("metadata", {}).get("ownerReferences", []) + for owner_ref in owner_refs: + if ( + owner_ref.get("kind") == "RayCluster" + and owner_ref.get("name") == cluster_name + ): + # Found the workload for this RayCluster + metadata = workload.get("metadata", {}) + status = workload.get("status", {}) + spec = workload.get("spec", {}) + admission = status.get("admission", {}) + + # Get queue name from workload spec + queue_name = spec.get("queueName", "") + + # Get status from conditions + conditions = status.get("conditions", []) + workload_status = "Unknown" + for condition in conditions: + if condition.get("status") == "True": + workload_status = condition.get("type", "Unknown") + break + + # Get admission time + admission_time = None + if admission: + admission_time = admission.get("clusterQueue") + + # Get error information if available + error_msg = None + error_reason = None + for condition in conditions: + if condition.get("type") == "Finished" and condition.get( + "status" + ) == "True": + if condition.get("reason") == "Failed": + error_reason = condition.get("reason") + error_msg = condition.get("message") + + return KueueWorkloadInfo( + name=metadata.get("name", "unknown"), + queue_name=queue_name, + status=workload_status, + priority=spec.get("priority"), + creation_time=metadata.get("creationTimestamp"), + admission_time=admission_time, + error_message=error_msg, + error_reason=error_reason, + ) + + return None + + except Exception as e: + # Silently fail if Kueue is not installed or workload info unavailable + return None + + def _get_app_wrappers( - namespace="default", filter=List[AppWrapperStatus] -) -> List[AppWrapper]: + namespace="default", filter=ListType[AppWrapperStatus] +) -> ListType[AppWrapper]: list_of_app_wrappers = [] try: diff --git a/src/codeflare_sdk/ray/cluster/pretty_print.py b/src/codeflare_sdk/ray/cluster/pretty_print.py index faa03258..7315d98b 100644 --- a/src/codeflare_sdk/ray/cluster/pretty_print.py +++ b/src/codeflare_sdk/ray/cluster/pretty_print.py @@ -157,6 +157,23 @@ def print_clusters(clusters: List[RayCluster]): ) # format that is used to generate the name of the service table0.add_row() table0.add_row(f"[link={dashboard} blue underline]Dashboard:link:[/link]") + + # Add Kueue information if available (for any cluster, not just AppWrapper-managed) + if cluster.kueue_workload: + table0.add_row() + table0.add_row("[bold blue]🎯 Kueue Integration[/bold blue]") + table0.add_row(f"[bold]Local Queue:[/bold] {cluster.local_queue or 'N/A'}") + table0.add_row(f"[bold]Workload Status:[/bold] [bold green]{cluster.kueue_workload.status}[/bold green]") + table0.add_row(f"[bold]Workload Name:[/bold] {cluster.kueue_workload.name}") + if cluster.kueue_workload.priority is not None: + table0.add_row(f"[bold]Priority:[/bold] {cluster.kueue_workload.priority}") + if cluster.kueue_workload.admission_time: + table0.add_row(f"[bold]Admitted:[/bold] {cluster.kueue_workload.admission_time}") + if cluster.kueue_workload.error_message: + table0.add_row(f"[bold red]Error:[/bold red] {cluster.kueue_workload.error_reason}: {cluster.kueue_workload.error_message}") + if cluster.is_appwrapper: + table0.add_row(f"[bold]AppWrapper Managed:[/bold] [green]Yes[/green]") + table0.add_row("") # empty row for spacing #'table1' to display the worker counts diff --git a/src/codeflare_sdk/ray/cluster/status.py b/src/codeflare_sdk/ray/cluster/status.py index 136ae302..57fa9685 100644 --- a/src/codeflare_sdk/ray/cluster/status.py +++ b/src/codeflare_sdk/ray/cluster/status.py @@ -21,7 +21,10 @@ from dataclasses import dataclass, field from enum import Enum import typing -from typing import Union +from typing import Union, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from ..rayjobs.status import KueueWorkloadInfo class RayClusterStatus(Enum): @@ -55,6 +58,26 @@ class CodeFlareClusterStatus(Enum): class RayCluster: """ For storing information about a Ray cluster. + + Attributes: + name: Name of the RayCluster + status: Current status of the cluster + head_cpu_requests: CPU requests for head node + head_cpu_limits: CPU limits for head node + head_mem_requests: Memory requests for head node + head_mem_limits: Memory limits for head node + num_workers: Number of worker nodes + worker_mem_requests: Memory requests per worker + worker_mem_limits: Memory limits per worker + worker_cpu_requests: CPU requests per worker + worker_cpu_limits: CPU limits per worker + namespace: Kubernetes namespace + dashboard: Dashboard URL + worker_extended_resources: Extended resources for workers (e.g., GPUs) + head_extended_resources: Extended resources for head node + local_queue: Kueue LocalQueue name (if managed by Kueue) + kueue_workload: Kueue workload information (if managed by Kueue) + is_appwrapper: Whether cluster is managed by AppWrapper """ name: str @@ -72,3 +95,6 @@ class RayCluster: dashboard: str worker_extended_resources: typing.Dict[str, int] = field(default_factory=dict) head_extended_resources: typing.Dict[str, int] = field(default_factory=dict) + local_queue: Optional[str] = None + kueue_workload: Optional["KueueWorkloadInfo"] = None + is_appwrapper: bool = False diff --git a/src/codeflare_sdk/ray/rayjobs/status.py b/src/codeflare_sdk/ray/rayjobs/status.py index 6b44b29c..11f99922 100644 --- a/src/codeflare_sdk/ray/rayjobs/status.py +++ b/src/codeflare_sdk/ray/rayjobs/status.py @@ -50,15 +50,17 @@ class CodeflareRayJobStatus(Enum): @dataclass class KueueWorkloadInfo: """ - For storing information about a Kueue workload associated with a RayJob. + For storing information about a Kueue workload associated with a RayJob or RayCluster. """ name: str - queue_name: str - status: str # e.g., "Pending", "Admitted", "Finished" + queue_name: str = "" # LocalQueue name (optional for RayClusters) + status: str = "Unknown" # e.g., "Pending", "Admitted", "Finished" priority: Optional[int] = None creation_time: Optional[str] = None admission_time: Optional[str] = None + error_message: Optional[str] = None + error_reason: Optional[str] = None @dataclass