-
Notifications
You must be signed in to change notification settings - Fork 0
Additional progress information for batch job monitoring #78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
76f76c8
4e26c37
4d3d784
dec18d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,3 +48,6 @@ coverage.xml | |
|
|
||
| # Sphinx documentation | ||
| docs/_build/ | ||
|
|
||
| # Intellij IDEA | ||
| .idea/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,2 @@ | ||
| 12.2.0 | ||
| - When updating columns in TDR convert to str in case it is an int and subsitution does not work | ||
| 12.2.1 | ||
| - Add time remaining display for batch TDR job execution. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,8 @@ | |
| import time | ||
| from typing import Callable, Optional, Any | ||
|
|
||
| import requests | ||
|
|
||
| from ..vars import ARG_DEFAULTS | ||
|
|
||
|
|
||
|
|
@@ -75,7 +77,7 @@ class SubmitAndMonitorMultipleJobs: | |
|
|
||
| def __init__( | ||
| self, tdr: Any, | ||
| job_function: Callable, | ||
| job_function: Callable[..., requests.Response], | ||
| job_args_list: list[tuple], | ||
| batch_size: int = ARG_DEFAULTS["batch_size"], # type: ignore[assignment] | ||
| check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] | ||
|
|
@@ -113,18 +115,28 @@ def run(self) -> None: | |
|
|
||
| Failed jobs are collected and printed out at the end of processing. | ||
| """ | ||
| failed_jobs = [] | ||
| failed_jobs: list[str] = [] | ||
| total_jobs = len(self.job_args_list) | ||
| batch_durations: list[Any] = [] | ||
| logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}") | ||
|
|
||
| # Process jobs in batches | ||
| for i in range(0, total_jobs, self.batch_size): | ||
| job_ids = [] | ||
| current_batch = self.job_args_list[i:i + self.batch_size] | ||
| start_time = time.time() | ||
| batch_num = i // self.batch_size + 1 | ||
| batches_left = (total_jobs + self.batch_size - 1) // self.batch_size - batch_num | ||
| logging.info( | ||
| f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs." | ||
| f"Submitting jobs for batch {batch_num} of {batches_left} with {len(current_batch)} jobs " | ||
|
||
| ) | ||
|
|
||
| # Estimate time remaining per batch | ||
| if batch_durations: | ||
| avg_duration = sum(batch_durations) / len(batch_durations) | ||
| est_remaining = avg_duration * batches_left | ||
| logging.info(f"Estimated time remaining: {est_remaining/60:.2f} minutes") | ||
|
|
||
| # Submit jobs for the current batch | ||
| for job_args in current_batch: | ||
| # Submit job with arguments and store the job ID | ||
|
|
@@ -134,7 +146,7 @@ def run(self) -> None: | |
| job_ids.append(job_id) | ||
|
|
||
| # Monitor jobs for the current batch | ||
| logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}") | ||
| logging.info(f"Monitoring {len(current_batch)} jobs in batch {batch_num}") | ||
| for job_id in job_ids: | ||
| try: | ||
| MonitorTDRJob( | ||
|
|
@@ -147,7 +159,9 @@ def run(self) -> None: | |
| logging.error(f"Job {job_id} failed: {e}") | ||
| failed_jobs.append(job_id) | ||
|
|
||
| logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.") | ||
| batch_durations.append(time.time() - start_time) | ||
|
|
||
| logging.info(f"Completed batch {batch_num} with {len(current_batch)} jobs. ") | ||
|
|
||
| logging.info(f"Successfully processed {total_jobs - len(failed_jobs)} jobs.") | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type annotation
list[Any]is too generic. Based on usage at line 162 wheretime.time() - start_timeis appended, this should belist[float]to accurately represent batch duration values in seconds.