diff --git a/.gitignore b/.gitignore index 1d8dd28..ed54738 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,6 @@ coverage.xml # Sphinx documentation docs/_build/ + +# Intellij IDEA +.idea/ \ No newline at end of file diff --git a/VERSION.txt b/VERSION.txt index 2e392bb..2e35b8b 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1,2 +1,2 @@ -12.2.0 -- When updating columns in TDR convert to str in case it is an int and subsitution does not work +12.2.1 +- Add time remaining display for batch TDR job execution. diff --git a/ops_utils/tdr_utils/tdr_job_utils.py b/ops_utils/tdr_utils/tdr_job_utils.py index 149a3df..937b4de 100644 --- a/ops_utils/tdr_utils/tdr_job_utils.py +++ b/ops_utils/tdr_utils/tdr_job_utils.py @@ -4,6 +4,8 @@ import time from typing import Callable, Optional, Any +import requests + from ..vars import ARG_DEFAULTS @@ -75,7 +77,7 @@ class SubmitAndMonitorMultipleJobs: def __init__( self, tdr: Any, - job_function: Callable, + job_function: Callable[..., requests.Response], job_args_list: list[tuple], batch_size: int = ARG_DEFAULTS["batch_size"], # type: ignore[assignment] check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] @@ -113,18 +115,28 @@ def run(self) -> None: Failed jobs are collected and printed out at the end of processing. """ - failed_jobs = [] + failed_jobs: list[str] = [] total_jobs = len(self.job_args_list) + batch_durations: list[Any] = [] logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}") # Process jobs in batches for i in range(0, total_jobs, self.batch_size): job_ids = [] current_batch = self.job_args_list[i:i + self.batch_size] + start_time = time.time() + batch_num = i // self.batch_size + 1 + batches_left = (total_jobs + self.batch_size - 1) // self.batch_size - batch_num logging.info( - f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs." + f"Submitting jobs for batch {batch_num} of {batches_left} with {len(current_batch)} jobs " ) + # Estimate time remaining per batch + if batch_durations: + avg_duration = sum(batch_durations) / len(batch_durations) + est_remaining = avg_duration * batches_left + logging.info(f"Estimated time remaining: {est_remaining/60:.2f} minutes") + # Submit jobs for the current batch for job_args in current_batch: # Submit job with arguments and store the job ID @@ -134,7 +146,7 @@ def run(self) -> None: job_ids.append(job_id) # Monitor jobs for the current batch - logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}") + logging.info(f"Monitoring {len(current_batch)} jobs in batch {batch_num}") for job_id in job_ids: try: MonitorTDRJob( @@ -147,7 +159,9 @@ def run(self) -> None: logging.error(f"Job {job_id} failed: {e}") failed_jobs.append(job_id) - logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.") + batch_durations.append(time.time() - start_time) + + logging.info(f"Completed batch {batch_num} with {len(current_batch)} jobs. ") logging.info(f"Successfully processed {total_jobs - len(failed_jobs)} jobs.")