Skip to content

Commit 8a0320f

Browse files
New API and a few enhancements (#77)
- Added `delete_files_and_snapshots()` - Modify SubmitAndMonitorMultipleJobs to collect job failures and report instead of exiting early - Added the `dry_run` property to `TDR`
1 parent 335483e commit 8a0320f

File tree

3 files changed

+80
-18
lines changed

3 files changed

+80
-18
lines changed

VERSION.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1-
11.6.0
2-
- Adding methods to get more specific workflow details in workspace
1+
11.7.0
2+
- Added delete_files_and_snapshots()
3+
- Modify SubmitAndMonitorMultipleJobs to collect job failures and report instead of exiting early
4+
- Added dry_run property to TDR

ops_utils/tdr_utils/tdr_api_utils.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@ class TDR:
2020
DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1"
2121
"""(str): The base URL for the TDR API."""
2222

23-
def __init__(self, request_util: RunRequest, env: str = 'prod'):
23+
def __init__(self, request_util: RunRequest, env: str = 'prod', dry_run: bool = False):
2424
"""
2525
Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
2626
2727
**Args:**
2828
- request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
2929
"""
3030
self.request_util = request_util
31+
# NOTE: dry_run is not fully implemented in this class, only in delete_files_and_snapshots
32+
self.dry_run = dry_run
3133
if env.lower() == 'prod':
3234
self.tdr_link = self.PROD_LINK
3335
elif env.lower() == 'dev':
@@ -180,6 +182,47 @@ def delete_files(
180182
check_interval=check_interval
181183
).run()
182184

185+
def _delete_snapshots_for_files(self, dataset_id: str, file_ids: set[str]) -> None:
186+
"""Delete snapshots that reference any of the provided file IDs."""
187+
snapshots_resp = self.get_dataset_snapshots(dataset_id=dataset_id)
188+
snapshot_items = snapshots_resp.json().get('items', [])
189+
snapshots_to_delete = []
190+
logging.info(
191+
"Checking %d snapshots for references",
192+
len(snapshot_items),
193+
)
194+
for snap in snapshot_items:
195+
snap_id = snap.get('id')
196+
if not snap_id:
197+
continue
198+
snap_files = self.get_files_from_snapshot(snapshot_id=snap_id)
199+
snap_file_ids = {
200+
fd.get('fileId') for fd in snap_files if fd.get('fileId')
201+
}
202+
# Use set intersection to check for any matching file IDs
203+
if snap_file_ids & file_ids:
204+
snapshots_to_delete.append(snap_id)
205+
if snapshots_to_delete:
206+
self.delete_snapshots(snapshot_ids=snapshots_to_delete)
207+
else:
208+
logging.info("No snapshots reference the provided file ids")
209+
210+
def _dry_run_msg(self) -> str:
211+
return '[Dry run] ' if self.dry_run else ''
212+
213+
def delete_files_and_snapshots(self, dataset_id: str, file_ids: set[str]) -> None:
214+
"""Delete files from a dataset by their IDs, handling snapshots."""
215+
self._delete_snapshots_for_files(dataset_id=dataset_id, file_ids=file_ids)
216+
217+
logging.info(
218+
f"{self._dry_run_msg()}Submitting delete request for {len(file_ids)} files in "
219+
f"dataset {dataset_id}")
220+
if not self.dry_run:
221+
self.delete_files(
222+
file_ids=list(file_ids),
223+
dataset_id=dataset_id
224+
)
225+
183226
def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
184227
"""
185228
Add a user to a dataset with a specified policy.
@@ -322,14 +365,16 @@ def delete_snapshots(
322365
- check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
323366
- verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
324367
"""
325-
SubmitAndMonitorMultipleJobs(
326-
tdr=self,
327-
job_function=self.delete_snapshot,
328-
job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
329-
batch_size=batch_size,
330-
check_interval=check_interval,
331-
verbose=verbose
332-
).run()
368+
logging.info(f"{self._dry_run_msg()}Deleting {len(snapshot_ids)} snapshots")
369+
if not self.dry_run:
370+
SubmitAndMonitorMultipleJobs(
371+
tdr=self,
372+
job_function=self.delete_snapshot,
373+
job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
374+
batch_size=batch_size,
375+
check_interval=check_interval,
376+
verbose=verbose
377+
).run()
333378

334379
def delete_snapshot(self, snapshot_id: str) -> requests.Response:
335380
"""
@@ -937,6 +982,10 @@ def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> li
937982
break
938983

939984
metadata.extend(response_json)
985+
if len(response_json) < limit:
986+
logging.info(f"Retrieved final batch of results, found {len(metadata)} total records")
987+
break
988+
940989
# Increment the offset by limit for the next page
941990
offset += limit
942991
batch += 1

ops_utils/tdr_utils/tdr_job_utils.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ def run(self) -> None:
110110
Run the process to submit and monitor multiple jobs in batches.
111111
112112
Logs the progress and status of each batch and job.
113+
114+
Failed jobs are collected and printed out at the end of processing.
113115
"""
116+
failed_jobs = []
114117
total_jobs = len(self.job_args_list)
115118
logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}")
116119

@@ -133,13 +136,21 @@ def run(self) -> None:
133136
# Monitor jobs for the current batch
134137
logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}")
135138
for job_id in job_ids:
136-
MonitorTDRJob(
137-
tdr=self.tdr,
138-
job_id=job_id,
139-
check_interval=self.check_interval,
140-
return_json=False
141-
).run()
139+
try:
140+
MonitorTDRJob(
141+
tdr=self.tdr,
142+
job_id=job_id,
143+
check_interval=self.check_interval,
144+
return_json=False
145+
).run()
146+
except Exception as e:
147+
logging.error(f"Job {job_id} failed: {e}")
148+
failed_jobs.append(job_id)
142149

143150
logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.")
144151

145-
logging.info(f"Successfully processed {total_jobs} jobs.")
152+
logging.info(f"Successfully processed {total_jobs - len(failed_jobs)} jobs.")
153+
154+
if len(failed_jobs) > 0:
155+
raise Exception(
156+
f"The following job IDs failed: {', '.join(failed_jobs)}")

0 commit comments

Comments
 (0)