Skip to content

Commit 5ced985

Browse files
authored
Merge pull request #67 from broadinstitute/sn_POD-2822_interact_with_dev
Adding ability to interact with dev
2 parents fb98b93 + 42eb517 commit 5ced985

File tree

7 files changed

+235
-65
lines changed

7 files changed

+235
-65
lines changed

VERSION.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1-
10.6.3
2-
- gcp cloud function checks for errors
1+
11.0.0
2+
- Adding ability to create snapshots
3+
- Be able to pass in environment variables to terra and TDR
4+
- Be able to pass in service account json for Token and GCP util

ops_utils/bq_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
from google.cloud import bigquery
44
from google.api_core.exceptions import Forbidden
5-
from typing import Optional
5+
from typing import Optional, Any
66

77

88
class BigQueryUtil:
@@ -69,7 +69,7 @@ def upload_data_to_table(self, table_id: str, rows: list[dict], delete_existing_
6969
new_rows = destination_table.num_rows
7070
logging.info(f"Table now contains {new_rows} rows after upload")
7171

72-
def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]:
72+
def query_table(self, query: str, to_dataframe: bool = False) -> Any:
7373
"""
7474
Execute a SQL query on a BigQuery table and returns the results.
7575
@@ -83,7 +83,7 @@ def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]:
8383
query_job = self.client.query(query)
8484
if to_dataframe:
8585
return query_job.result().to_dataframe()
86-
return [row for row in query_job.result()]
86+
return [dict(row) for row in query_job.result()]
8787

8888
def check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool:
8989
"""

ops_utils/gcp_utils.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import logging
44
import io
5+
import json
56
import hashlib
67
import base64
78
import subprocess
@@ -10,6 +11,9 @@
1011
from mimetypes import guess_type
1112
from typing import Optional, Any
1213
from google.cloud.storage.blob import Blob
14+
from google.oauth2 import service_account
15+
from google.cloud import storage
16+
from google.auth import default
1317

1418
from .vars import ARG_DEFAULTS
1519
from .thread_pool_executor_util import MultiThreadedJobs
@@ -27,21 +31,42 @@
2731
class GCPCloudFunctions:
2832
"""Class to handle GCP Cloud Functions."""
2933

30-
def __init__(self, project: Optional[str] = None) -> None:
34+
def __init__(
35+
self,
36+
project: Optional[str] = None,
37+
service_account_json: Optional[str] = None
38+
) -> None:
3139
"""
3240
Initialize the GCPCloudFunctions class.
3341
34-
Authenticates using the default credentials and sets up the Storage Client.
35-
Uses the `project_id` if provided, otherwise utilizes the default project set.
42+
Authenticates using service account JSON if provided or default credentials,
43+
and sets up the Storage Client.
3644
37-
**Args:**
38-
- project (str, optional): The GCP project ID
39-
"""
40-
from google.cloud import storage # type: ignore[attr-defined]
41-
from google.auth import default
42-
credentials, default_project = default()
45+
Args:
46+
project: Optional[str] = None
47+
The GCP project ID. If not provided, will use project from service account or default.
48+
service_account_json: Optional[str] = None
49+
Path to service account JSON key file. If provided, will use these credentials.
50+
"""
51+
# Initialize credentials and project
52+
credentials = None
53+
default_project = None
54+
55+
if service_account_json:
56+
credentials = service_account.Credentials.from_service_account_file(service_account_json)
57+
# Extract project from service account if not specified
58+
if not project:
59+
with open(service_account_json, 'r') as f:
60+
sa_info = json.load(f)
61+
project = sa_info.get('project_id')
62+
else:
63+
# Use default credentials
64+
credentials, default_project = default()
65+
66+
# Set project if not already set
4367
if not project:
4468
project = default_project
69+
4570
self.client = storage.Client(credentials=credentials, project=project)
4671
"""@private"""
4772

ops_utils/tdr_utils/tdr_api_utils.py

Lines changed: 87 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,24 @@
1818
class TDR:
1919
"""Class to interact with the Terra Data Repository (TDR) API."""
2020

21-
TDR_LINK = "https://data.terra.bio/api/repository/v1"
21+
PROD_LINK = "https://data.terra.bio/api/repository/v1"
22+
DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1"
2223
"""(str): The base URL for the TDR API."""
2324

24-
def __init__(self, request_util: RunRequest):
25+
def __init__(self, request_util: RunRequest, env: str = 'prod'):
2526
"""
2627
Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
2728
2829
**Args:**
2930
- request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
3031
"""
3132
self.request_util = request_util
33+
if env.lower() == 'prod':
34+
self.tdr_link = self.PROD_LINK
35+
elif env.lower() == 'dev':
36+
self.tdr_link = self.DEV_LINK
37+
else:
38+
raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.")
3239
"""@private"""
3340

3441
@staticmethod
@@ -89,7 +96,7 @@ def get_dataset_files(
8996
**Returns:**
9097
- list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
9198
"""
92-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files"
99+
uri = f"{self.tdr_link}/datasets/{dataset_id}/files"
93100
logging.info(f"Getting all files in dataset {dataset_id}")
94101
return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
95102

@@ -153,12 +160,12 @@ def get_sas_token(self, snapshot_id: str = "", dataset_id: str = "") -> dict:
153160
- ValueError: If neither `snapshot_id` nor `dataset_id` is provided.
154161
"""
155162
if snapshot_id:
156-
uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include=ACCESS_INFORMATION"
163+
uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include=ACCESS_INFORMATION"
157164
response = self.request_util.run_request(uri=uri, method=GET)
158165
snapshot_info = json.loads(response.text)
159166
sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"]
160167
elif dataset_id:
161-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include=ACCESS_INFORMATION"
168+
uri = f"{self.tdr_link}/datasets/{dataset_id}?include=ACCESS_INFORMATION"
162169
response = self.request_util.run_request(uri=uri, method=GET)
163170
snapshot_info = json.loads(response.text)
164171
sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"]
@@ -182,7 +189,7 @@ def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
182189
**Returns:**
183190
- requests.Response: The response from the request.
184191
"""
185-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/{file_id}"
192+
uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}"
186193
logging.info(f"Submitting delete job for file {file_id}")
187194
return self.request_util.run_request(uri=uri, method=DELETE)
188195

@@ -226,7 +233,7 @@ def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> reques
226233
- ValueError: If the policy is not valid.
227234
"""
228235
self._check_policy(policy)
229-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members"
236+
uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members"
230237
member_dict = {"email": user}
231238
logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
232239
return self.request_util.run_request(
@@ -253,7 +260,7 @@ def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> r
253260
- ValueError: If the policy is not valid.
254261
"""
255262
self._check_policy(policy)
256-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members/{user}"
263+
uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}"
257264
logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
258265
return self.request_util.run_request(uri=uri, method=DELETE)
259266

@@ -264,7 +271,7 @@ def delete_dataset(self, dataset_id: str) -> None:
264271
**Args:**
265272
dataset_id (str): The ID of the dataset to be deleted.
266273
"""
267-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}"
274+
uri = f"{self.tdr_link}/datasets/{dataset_id}"
268275
logging.info(f"Deleting dataset {dataset_id}")
269276
response = self.request_util.run_request(uri=uri, method=DELETE)
270277
job_id = response.json()['id']
@@ -308,7 +315,7 @@ def get_snapshot_info(
308315
include_string = '&include='.join(info_to_include)
309316
else:
310317
include_string = ""
311-
uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include={include_string}"
318+
uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}"
312319
response = self.request_util.run_request(
313320
uri=uri,
314321
method=GET,
@@ -356,7 +363,7 @@ def delete_snapshot(self, snapshot_id: str) -> requests.Response:
356363
**Returns:**
357364
- requests.Response: The response from the request.
358365
"""
359-
uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}"
366+
uri = f"{self.tdr_link}/snapshots/{snapshot_id}"
360367
logging.info(f"Deleting snapshot {snapshot_id}")
361368
return self.request_util.run_request(uri=uri, method=DELETE)
362369

@@ -383,7 +390,7 @@ def _yield_existing_datasets(
383390
log_message = f"Searching for all datasets in batches of {batch_size}"
384391
logging.info(log_message)
385392
while True:
386-
uri = f"{self.TDR_LINK}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}" # noqa: E501
393+
uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}" # noqa: E501
387394
response = self.request_util.run_request(uri=uri, method=GET)
388395
datasets = response.json()["items"]
389396
if not datasets:
@@ -459,7 +466,7 @@ def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]]
459466
include_string = '&include='.join(info_to_include)
460467
else:
461468
include_string = ""
462-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include={include_string}"
469+
uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}"
463470
return self.request_util.run_request(uri=uri, method=GET)
464471

465472
def get_table_schema_info(
@@ -497,7 +504,7 @@ def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.
497504
**Returns:**
498505
- requests.Response: The response from the request.
499506
"""
500-
uri = f"{self.TDR_LINK}/jobs/{job_id}/result"
507+
uri = f"{self.tdr_link}/jobs/{job_id}/result"
501508
# If job is expected to fail, accept any return code
502509
acceptable_return_code = list(range(100, 600)) if expect_failure else []
503510
return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
@@ -513,7 +520,7 @@ def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
513520
**Returns:**
514521
- requests.Response: The response from the request.
515522
"""
516-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/ingest"
523+
uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest"
517524
logging.info(
518525
"If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
519526
"it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
@@ -543,7 +550,7 @@ def file_ingest_to_dataset(
543550
**Returns:**
544551
- dict: A dictionary containing the response from the ingest operation job monitoring.
545552
"""
546-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/bulk/array"
553+
uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array"
547554
data = {
548555
"profileId": profile_id,
549556
"loadTag": f"{load_tag}",
@@ -601,7 +608,7 @@ def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_
601608
"limit": query_limit,
602609
"sort": "datarepo_row_id"
603610
}
604-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/data/{target_table_name}"
611+
uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}"
605612
while True:
606613
batch_number = int((search_request["offset"] / query_limit)) + 1 # type: ignore[operator]
607614
response = self.request_util.run_request(
@@ -645,7 +652,7 @@ def get_job_status(self, job_id: str) -> requests.Response:
645652
**Returns:**
646653
- requests.Response: The response from the request.
647654
"""
648-
uri = f"{self.TDR_LINK}/jobs/{job_id}"
655+
uri = f"{self.tdr_link}/jobs/{job_id}"
649656
return self.request_util.run_request(uri=uri, method=GET)
650657

651658
def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
@@ -707,7 +714,7 @@ def soft_delete_entries(
707714
logging.info(f"No records found to soft delete in table {table_name}")
708715
return None
709716
logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
710-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/deletes"
717+
uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes"
711718
payload = {
712719
"deleteType": "soft",
713720
"specType": "jsonArray",
@@ -766,6 +773,7 @@ def get_or_create_dataset(
766773
billing_profile: str,
767774
schema: dict,
768775
description: str,
776+
relationships: Optional[list[dict]] = None,
769777
delete_existing: bool = False,
770778
continue_if_exists: bool = False,
771779
additional_properties_dict: Optional[dict] = None
@@ -778,6 +786,8 @@ def get_or_create_dataset(
778786
- billing_profile (str): The billing profile ID.
779787
- schema (dict): The schema of the dataset.
780788
- description (str): The description of the dataset.
789+
- relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema.
790+
Defaults to None.
781791
- additional_properties_dict (Optional[dict], optional): Additional properties
782792
for the dataset. Defaults to None.
783793
- delete_existing (bool, optional): Whether to delete the existing dataset if found.
@@ -857,7 +867,7 @@ def create_dataset( # type: ignore[return]
857867
CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type]
858868
except ValidationError as e:
859869
raise ValueError(f"Schema validation error: {e}")
860-
uri = f"{self.TDR_LINK}/datasets"
870+
uri = f"{self.tdr_link}/datasets"
861871
logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
862872
response = self.request_util.run_request(
863873
method=POST,
@@ -895,7 +905,7 @@ def update_dataset_schema( # type: ignore[return]
895905
**Raises:**
896906
- ValueError: If the schema validation fails.
897907
"""
898-
uri = f"{self.TDR_LINK}/datasets/{dataset_id}/updateSchema"
908+
uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema"
899909
request_body: dict = {"description": f"{update_note}", "changes": {}}
900910
if tables_to_add:
901911
request_body["changes"]["addTables"] = tables_to_add
@@ -968,7 +978,7 @@ def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[d
968978
**Returns:**
969979
- list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
970980
"""
971-
uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}/files"
981+
uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files"
972982
return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
973983

974984
def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
@@ -981,12 +991,66 @@ def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
981991
**Returns:**
982992
- requests.Response: The response from the request.
983993
"""
984-
uri = f"{self.TDR_LINK}/snapshots?datasetIds={dataset_id}"
994+
uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}"
985995
return self.request_util.run_request(
986996
uri=uri,
987997
method=GET
988998
)
989999

1000+
def create_snapshot(
1001+
self,
1002+
snapshot_name: str,
1003+
description: str,
1004+
dataset_name: str,
1005+
snapshot_mode: str, # byFullView is entire dataset
1006+
profile_id: str,
1007+
stewards: Optional[list[str]] = [],
1008+
readers: Optional[list[str]] = [],
1009+
consent_code: Optional[str] = None,
1010+
duos_id: Optional[str] = None,
1011+
data_access_control_groups: Optional[list[str]] = None,
1012+
) -> None:
1013+
"""
1014+
Create a snapshot in TDR.
1015+
1016+
**Returns:**
1017+
- requests.Response: The response from the request.
1018+
"""
1019+
uri = f"{self.tdr_link}/snapshots"
1020+
payload = {
1021+
"name": snapshot_name,
1022+
"description": description,
1023+
"contents": [
1024+
{
1025+
"datasetName": dataset_name,
1026+
"mode": snapshot_mode,
1027+
}
1028+
],
1029+
"policies": {
1030+
"stewards": stewards,
1031+
"readers": readers,
1032+
},
1033+
"profileId": profile_id,
1034+
"globalFileIds": True,
1035+
}
1036+
if consent_code:
1037+
payload["consentCode"] = consent_code
1038+
if duos_id:
1039+
payload["duosId"] = duos_id
1040+
if data_access_control_groups:
1041+
payload["dataAccessControlGroups"] = data_access_control_groups
1042+
logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}")
1043+
response = self.request_util.run_request(
1044+
uri=uri,
1045+
method=POST,
1046+
content_type="application/json",
1047+
data=json.dumps(payload)
1048+
)
1049+
job_id = response.json()["id"]
1050+
job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
1051+
snapshot_id = job_results["id"] # type: ignore[index]
1052+
logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")
1053+
9901054

9911055
class FilterOutSampleIdsAlreadyInDataset:
9921056
"""Class to filter ingest metrics to remove sample IDs that already exist in the dataset."""

0 commit comments

Comments
 (0)