diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 3c39f37e..a5cdb39f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -141,5 +141,5 @@ jobs: - name: publish to PyPI env: TWINE_USERNAME: __token__ - TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} + TWINE_PASSWORD: ${{ secrets.PYPI_API_FUSIONDEVS }} run: uv run twine upload dist/* diff --git a/py_src/fusion/__init__.py b/py_src/fusion/__init__.py index 8920fa66..fe0c5b67 100644 --- a/py_src/fusion/__init__.py +++ b/py_src/fusion/__init__.py @@ -2,7 +2,7 @@ __author__ = """Fusion Devs""" __email__ = "fusion_developers@jpmorgan.com" -__version__ = "3.0.1" +__version__ = "3.0.2-dev0" from fusion.credentials import FusionCredentials from fusion.fs_sync import fsync diff --git a/py_src/fusion/attributes.py b/py_src/fusion/attributes.py index e75eb2b6..d8074e1d 100644 --- a/py_src/fusion/attributes.py +++ b/py_src/fusion/attributes.py @@ -13,6 +13,8 @@ CamelCaseMeta, camel_to_snake, convert_date_format, + ensure_resources, + handle_paginated_request, make_bool, requests_raise_for_status, snake_to_camel, @@ -796,11 +798,10 @@ def from_catalog(self, dataset: str, catalog: str | None = None, client: Fusion client = self._use_client(client) catalog = client._use_catalog(catalog) url = f"{client.root_url}catalogs/{catalog}/datasets/{dataset}/attributes" - response = client.session.get(url) - requests_raise_for_status(response) - list_attributes = response.json()["resources"] + response = handle_paginated_request(client.session, url) + ensure_resources(response) + list_attributes = response["resources"] list_attributes = sorted(list_attributes, key=lambda x: x["index"]) - self.attributes = [Attribute._from_dict(attr_data) for attr_data in list_attributes] return self diff --git a/py_src/fusion/credentials.py b/py_src/fusion/credentials.py index 03bcb48c..1d1ecaf3 100644 --- a/py_src/fusion/credentials.py +++ b/py_src/fusion/credentials.py @@ -13,8 +13,11 @@ from fusion.exceptions import CredentialError +from . import __version__ + _DEFAULT_GRANT_TYPE = "client_credentials" _DEFAULT_AUTH_URL = "https://authe.jpmorgan.com/as/token.oauth2" +VERSION = __version__ def _now_ts() -> int: @@ -400,7 +403,7 @@ def refresh_bearer_token(self) -> None: else: raise ValueError("Unrecognized grant type") - resp = sess.post(auth_url, data=payload, headers={"User-Agent": "fusion-python-sdk"}) + resp = sess.post(auth_url, data=payload, headers={"User-Agent": f"fusion-python-sdk {VERSION}"}) try: resp.raise_for_status() except requests.HTTPError as e: # noqa: TRY003 @@ -437,7 +440,7 @@ def get_fusion_token_headers(self, url: str) -> dict[str, str]: # noqa: PLR0912 if not self.bearer_token: raise ValueError("No bearer token set (Error Code: 400)") - headers: dict[str, str] = {"User-Agent": "fusion-python-sdk"} + headers: dict[str, str] = {"User-Agent": f"fusion-python-sdk {VERSION}"} if self.fusion_e2e: headers["fusion-e2e"] = self.fusion_e2e @@ -468,7 +471,7 @@ def get_fusion_token_headers(self, url: str) -> dict[str, str]: # noqa: PLR0912 fusion_auth_url, headers={ "Authorization": f"Bearer {self.bearer_token.token}", - "User-Agent": "fusion-python-sdk", + "User-Agent": f"fusion-python-sdk {VERSION}", }, ) try: diff --git a/py_src/fusion/dataflow.py b/py_src/fusion/dataflow.py index 8eed58e6..59be1bf7 100644 --- a/py_src/fusion/dataflow.py +++ b/py_src/fusion/dataflow.py @@ -28,95 +28,102 @@ class Dataflow(metaclass=CamelCaseMeta): """Fusion Dataflow class for managing dataflow metadata in the Fusion system. Attributes: - providerNode (dict[str, str] | None): - Defines the provider/source node of the data flow. - Required when creating a data flow. - Expected keys: - - ``name`` (str): Provider node name. - - ``dataNodeType`` (str): Provider node type. - - consumerNode (dict[str, str] | None): - Defines the consumer/target node of the data flow. - Required when creating a data flow and must be distinct from the provider node. - Expected keys: - - ``name`` (str): Consumer node name. - - ``dataNodeType`` (str): Consumer node type. + provider_node (dict[str, str] | None): + Provider node of the dataflow. It must be distinct from the consumer node. Required for create(). + Keys: ``name``, ``type``. + + consumer_node (dict[str, str] | None): + Consumer node of the dataflow. It must be distinct from the provider node. Required for create(). + Keys: ``name``, ``type``. description (str | None, optional): - Purpose/summary of the data flow. If this field is present, it must not be blank. - (API range reference: length 1..65535). Defaults to ``None``. + Specifies the purpose of the data movement. id (str | None, optional): - Server-assigned unique identifier of the data flow. Must be set on the object for - ``update()``, ``update_fields()``, and ``delete()``. Defaults to ``None``. - - alternativeId (dict[str, Any] | None, optional): - Alternative/secondary identifier object for the data flow. Based on the API schema, - this may include: - - ``value`` (str): Up to 255 characters. - - ``domain`` (str): A domain string. - - ``isSor`` (bool): Whether this is a System-of-Record id. - Defaults to ``None``. - - transportType (str | None, optional): - Transport type of the data flow. API allows values like - ``"SYNCHRONOUS MESSAGING"``, ``"FILE TRANSFER"``, ``"API"``, ``"ASYNCHRONOUS MESSAGING"``. - Defaults to ``None``. + Server-assigned identifier. Must be set for ``update()``, ``update_fields()``, and ``delete()``. + + transport_type (str | None, optional): + Transport type frequency (str | None, optional): - Frequency of the data flow. API allows values such as - ``"BI-WEEKLY"``, ``"WEEKLY"``, ``"SEMI-ANNUALLY"``, ``"QUARTERLY"``, ``"ANNUALLY"``, - ``"DAILY"``, ``"ADHOC"``, ``"INTRA-DAY"``, ``"MONTHLY"``, ``"TWICE-WEEKLY"``, ``"BI-MONTHLY"``. - Defaults to ``None``. + Frequency of the data flow - startTime (str | None, optional): - Scheduled start time (ISO 8601 / time-of-day formats like ``HH:mm:ss`` or ``HH:mm:ssZ``). - Defaults to ``None``. + start_time (str | None, optional): + Scheduled start time of the Dataflow. - endTime (str | None, optional): - Scheduled end time (ISO 8601 / time-of-day formats like ``HH:mm:ss`` or ``HH:mm:ssZ``). - Defaults to ``None``. + end_time (str | None, optional): + Scheduled end time of the Dataflow. - dataAssets (list[dict[str, Any]], optional): - List of data asset objects involved in the data flow (up to API-defined limits). Defaults to empty list. + source_system (dict[str, Any] | None, optional): + Source System of the data flow. - boundarySets (list[dict[str, Any]], optional): - Boundary set objects for the data flow; items are stored as provided. Defaults to empty list. - """ + datasets (list[dict[str, Any]], optional): + Specifies a list of datasets involved in the data flow, requiring a visibility license for each. + Maximum limit is of 100 datasets per dataflow. + An error will be thrown if the list contains duplicate entries. Defaults to empty list. + + connection_type (str | None, required for ``create()``): + Connection type for the dataflow. - # Required at create-time (optional at init-time for handles) - providerNode: dict[str, str] | None = None - consumerNode: dict[str, str] | None = None + _client (Fusion | None): + Fusion client . + """ - # Optional fields + provider_node: dict[str, str] | None = None + consumer_node: dict[str, str] | None = None description: str | None = None id: str | None = None - alternativeId: dict[str, Any] | None = None - transportType: str | None = None + transport_type: str | None = None frequency: str | None = None - startTime: str | None = None - endTime: str | None = None - boundarySets: list[dict[str, Any]] = field(default_factory=list) - dataAssets: list[dict[str, Any]] = field(default_factory=list) + start_time: str | None = None + end_time: str | None = None + source_system: dict[str, Any] | None = None + datasets: list[dict[str, Any]] = field(default_factory=list) + connection_type: str | None = None _client: Fusion | None = field(init=False, repr=False, compare=False, default=None) def __post_init__(self) -> None: - """Normalize description immediately after initialization.""" - self.description = tidy_string(self.description or "") + """Normalize key fields after initialization.""" + self.description = tidy_string(self.description) if self.description is not None else None + self.id = tidy_string(self.id) if self.id is not None else None + self.transport_type = tidy_string(self.transport_type) if self.transport_type is not None else None + self.frequency = tidy_string(self.frequency) if self.frequency is not None else None + self.start_time = tidy_string(self.start_time) if self.start_time is not None else None + self.end_time = tidy_string(self.end_time) if self.end_time is not None else None + self.connection_type = tidy_string(self.connection_type) if self.connection_type is not None else None + + if isinstance(self.provider_node, dict): + if isinstance(self.provider_node.get("name"), str): + self.provider_node["name"] = tidy_string(self.provider_node["name"]) + if isinstance(self.provider_node.get("type"), str): + self.provider_node["type"] = tidy_string(self.provider_node["type"]) + + if isinstance(self.consumer_node, dict): + if isinstance(self.consumer_node.get("name"), str): + self.consumer_node["name"] = tidy_string(self.consumer_node["name"]) + if isinstance(self.consumer_node.get("type"), str): + self.consumer_node["type"] = tidy_string(self.consumer_node["type"]) + + if self.datasets is None: + self.datasets = [] + elif not isinstance(self.datasets, list): + self.datasets = [self.datasets] def __getattr__(self, name: str) -> Any: - """Allow camelCase access for snake_case attributes.""" - snake_name = camel_to_snake(name) - return self.__dict__.get(snake_name, None) + """Allow camelCase attribute access""" + snake = camel_to_snake(name) + if snake in self.__dict__: + return self.__dict__[snake] + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __setattr__(self, name: str, value: Any) -> None: - """Allow camelCase assignment to snake_case attributes.""" if name == "client": object.__setattr__(self, name, value) else: - snake_name = camel_to_snake(name) - self.__dict__[snake_name] = value + snake = camel_to_snake(name) + self.__dict__[snake] = value + @property def client(self) -> Fusion | None: @@ -127,80 +134,45 @@ def client(self) -> Fusion | None: def client(self, client: Fusion | None) -> None: """Set the client for the Dataflow. Set automatically if instantiated from a Fusion object. - Args: - client (Any): Fusion client object. - Examples: >>> from fusion import Fusion >>> fusion = Fusion() >>> flow = fusion.dataflow( - ... provider_node={"name": "CRM_DB", "dataNodeType": "Database"}, - ... consumer_node={"name": "DWH", "dataNodeType": "Database"}, + ... provider_node={"name": "CRM_DB", "type": "Database"}, + ... consumer_node={"name": "DWH", "type": "Database"}, ... ) >>> flow.client = fusion """ self._client = client def _use_client(self, client: Fusion | None) -> Fusion: - """Determine client. - - Returns: - Fusion: The resolved Fusion client to use. - - Raises: - ValueError: If neither a provided client nor a bound client is available. - """ + """Resolve the client or raise if missing.""" res = self._client if client is None else client if res is None: raise ValueError("A Fusion client object is required.") return res - # ----------------------- - # Converters / loaders - # ----------------------- @classmethod def from_dict(cls: type[Dataflow], data: dict[str, Any]) -> Dataflow: - """Create a Dataflow object from a dictionary. - - Accepts camelCase or snake_case keys and normalizes empty strings to None. - - Args: - data (dict[str, Any]): Dataflow fields, potentially nested, in camelCase or snake_case. + """Instantiate a Dataflow object from a dictionary. Returns: - Dataflow: A populated Dataflow instance. - """ + Dataflow: The constructed object. - def normalize_value(val: Any) -> Any: - if isinstance(val, str) and val.strip() == "": - return None - return val - - def convert_keys(d: dict[str, Any]) -> dict[str, Any]: - converted: dict[str, Any] = {} - for k, v in d.items(): - key = camel_to_snake(k) - if isinstance(v, dict): - converted[key] = convert_keys(v) - elif isinstance(v, list): - converted[key] = [ - convert_keys(i) if isinstance(i, dict) else i - for i in v - ] - else: - converted[key] = normalize_value(v) - return converted - - converted_data = convert_keys(data) - valid_fields = {f.name for f in fields(cls)} - filtered_data = {k: v for k, v in converted_data.items() if k in valid_fields} - - obj = cls.__new__(cls) - for field_obj in fields(cls): - setattr(obj, field_obj.name, filtered_data.get(field_obj.name, None)) - - obj.__post_init__() + Examples: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> flow = fusion.dataflow().from_object({ + ... "providerNode": {"name": "CRM_DB", "type": "Database"}, + ... "consumerNode": {"name": "DWH", "type": "Database"}, + ... "connectionType": "Consumes From" + ... }) + """ + keys = {f.name for f in fields(cls)} + mapped = {camel_to_snake(k): v for k, v in data.items()} + filtered = {k: v for k, v in mapped.items() if k in keys} + obj = cls(**filtered) return obj @classmethod @@ -232,9 +204,13 @@ def from_dataframe(cls, frame: pd.DataFrame, client: Fusion | None = None) -> li return results def from_object(self, dataflow_source: Dataflow | dict[str, Any] | str | pd.Series) -> Dataflow: # type: ignore[type-arg] - """Instantiate a single Dataflow from a Dataflow, dict, JSON-object string, or pandas Series. + """Instantiate a Dataflow from a Dataflow, dict, JSON-object string, or pandas Series. - Note: CSV input is not supported here. + Examples: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> flow = fusion.dataflow().from_object('{"providerNode":{"name":"A","type":"DB"}, + "consumerNode":{"name":"B","type":"DB"},"connectionType":"Consumes From"}') """ import json @@ -256,29 +232,25 @@ def from_object(self, dataflow_source: Dataflow | dict[str, Any] | str | pd.Seri obj.client = self._client return obj - # ----------------------- - # Validation / serialization - # ----------------------- def validate(self) -> None: - """Validate that required fields exist. - - Raises: - ValueError: If required fields are missing. - """ + """Validate that required fields exist.""" required_fields = ["provider_node", "consumer_node"] missing = [f for f in required_fields if getattr(self, f, None) in [None, ""]] if missing: raise ValueError(f"Missing required fields in Dataflow: {', '.join(missing)}") def _validate_nodes_for_create(self) -> None: - """Ensure provider/consumer nodes are present with non-blank name and dataNodeType for create().""" + """Ensure provider/consumer nodes are present with non-blank name and type for create().""" for attr in ("provider_node", "consumer_node"): node = getattr(self, attr, None) if not isinstance(node, dict): - raise ValueError(f"{attr} must be a dict with 'name' and 'dataNodeType' for create().") - if not node.get("name") or not node.get("nodeType"): - raise ValueError(f"{attr} requires non-empty 'name' and 'dataNodeType' for create().") + raise ValueError(f"{attr} must be a dict with 'name' and 'type' for create().") + if not node.get("name") or not node.get("type"): + raise ValueError(f"{attr} requires non-empty 'name' and 'type' for create().") + if not self.connection_type: + raise ValueError("connection_type is required for create().") + def to_dict( self, @@ -286,7 +258,21 @@ def to_dict( drop_none: bool = True, exclude: set[str] | None = None, ) -> dict[str, Any]: - """Convert Dataflow object into a JSON-serializable dictionary.""" + """Convert the Dataflow instance to a dictionary. + + Returns: + dict[str, Any]: Dataflow metadata as a dictionary ready for API calls. + + Examples: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> flow = fusion.dataflow( + ... provider_node={"name": "CRM_DB", "type": "Database"}, + ... consumer_node={"name": "DWH", "type": "Database"}, + ... connection_type="Consumes From", + ... ) + >>> flow_dict = flow.to_dict() + """ out: dict[str, Any] = {} for k, v in self.__dict__.items(): if k.startswith("_"): @@ -304,17 +290,25 @@ def create( client: Fusion | None = None, return_resp_obj: bool = False, ) -> requests.Response | None: - """Create the dataflow via API.""" - client = self._use_client(client) + """Create the dataflow via API. - + Examples: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> flow = fusion.dataflow( + ... provider_node={"name": "CRM_DB", "type": "Database"}, + ... consumer_node={"name": "DWH", "type": "Database"}, + ... connection_type="Consumes From", + ... ) + >>> flow.create() + """ + client = self._use_client(client) self._validate_nodes_for_create() payload = self.to_dict(drop_none=True, exclude={"id"}) url = f"{client._get_new_root_url()}/api/corelineage-service/v1/lineage/dataflows" resp: requests.Response = client.session.post(url, json=payload) requests_raise_for_status(resp) - return resp if return_resp_obj else None def update( @@ -322,7 +316,13 @@ def update( client: Fusion | None = None, return_resp_obj: bool = False, ) -> requests.Response | None: - """Full replace (PUT) using self.id, excluding provider/consumer nodes from the payload.""" + """Full update using id. + + Examples: + >>> flow = fusion.dataflow(id="abc-123") + >>> flow.description = "Updated" + >>> flow.update() + """ client = self._use_client(client) if not self.id: raise ValueError("Dataflow ID is required on the object (set self.id before update()).") @@ -331,7 +331,6 @@ def update( drop_none=True, exclude={"id", "provider_node", "consumer_node"}, ) - url = f"{client._get_new_root_url()}/api/corelineage-service/v1/lineage/dataflows/{self.id}" resp: requests.Response = client.session.put(url, json=payload) requests_raise_for_status(resp) @@ -339,27 +338,34 @@ def update( def update_fields( self, - changes: dict[str, Any], client: Fusion | None = None, return_resp_obj: bool = False, ) -> requests.Response | None: - """Partial update (PATCH) using self.id. Provider/consumer nodes are not allowed.""" + """Partial update using the object's current state. + + Notes: + - Fields set to ``None`` on this object are included in the PATCH body + (sent as JSON ``null``), allowing you to clear values on the server. + - Provider/consumer nodes are excluded from partial updates. + + Examples: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> flow = fusion.dataflow(id="abc-123", frequency=None) # clear frequency + >>> flow.update_fields() + """ client = self._use_client(client) if not self.id: raise ValueError("Dataflow ID is required on the object (set self.id before update_fields()).") - forbidden = {"provider_node", "consumer_node"} - normalized = {camel_to_snake(k): v for k, v in changes.items()} - used = forbidden.intersection(normalized.keys()) - if used: - raise ValueError( - f"Cannot update {sorted(used)} via PATCH; provider/consumer nodes are immutable for updates." - ) - - patch_body = {snake_to_camel(k): v for k, v in normalized.items()} + # Build payload from current object; include None to allow clearing fields. + payload = self.to_dict( + drop_none=False, # include None (as JSON null) + exclude={"id", "provider_node", "consumer_node"}, + ) url = f"{client._get_new_root_url()}/api/corelineage-service/v1/lineage/dataflows/{self.id}" - resp: requests.Response = client.session.patch(url, json=patch_body) + resp: requests.Response = client.session.patch(url, json=payload) requests_raise_for_status(resp) return resp if return_resp_obj else None @@ -368,7 +374,12 @@ def delete( client: Fusion | None = None, return_resp_obj: bool = False, ) -> requests.Response | None: - """Delete this dataflow using self.id.""" + """Delete this dataflow using Id. + + Examples: + >>> flow = fusion.dataflow(id="abc-123") + >>> flow.delete() + """ client = self._use_client(client) if not self.id: raise ValueError("Dataflow ID is required on the object (set self.id before delete()).") diff --git a/py_src/fusion/dataset.py b/py_src/fusion/dataset.py index a6aeae87..9085dc74 100644 --- a/py_src/fusion/dataset.py +++ b/py_src/fusion/dataset.py @@ -8,11 +8,14 @@ import pandas as pd +from .exceptions import APIResponseError from .utils import ( CamelCaseMeta, _is_json, camel_to_snake, convert_date_format, + ensure_resources, + handle_paginated_request, make_bool, make_list, requests_raise_for_status, @@ -303,6 +306,10 @@ def _from_dict(cls: type[Dataset], data: dict[str, Any]) -> Dataset: """ keys = [f.name for f in fields(cls)] keys = ["type" if key == "type_" else key for key in keys] + + if "tag" in data: + data["tags"] = data.pop("tag") + data = {camel_to_snake(k): v for k, v in data.items()} data = {k: v for k, v in data.items() if k in keys} if "type" in data: @@ -444,18 +451,26 @@ def from_catalog(self, catalog: str | None = None, client: Fusion | None = None) client = self._use_client(client) catalog = client._use_catalog(catalog) dataset = self.identifier - resp = client.session.get(f"{client.root_url}catalogs/{catalog}/datasets") - requests_raise_for_status(resp) - list_datasets = resp.json()["resources"] - dict_ = [dict_ for dict_ in list_datasets if dict_["identifier"] == dataset][0] + + url = f"{client.root_url}catalogs/{catalog}/datasets" + resp = handle_paginated_request(client.session, url) + ensure_resources(resp) + list_datasets = resp["resources"] + matching_datasets = [dict_ for dict_ in list_datasets if dict_["identifier"] == dataset] + if not matching_datasets: + raise ValueError(f"Dataset with identifier '{dataset}' not found in catalog '{catalog}'.") + dict_ = matching_datasets[0] dataset_obj = self._from_dict(dict_) dataset_obj.client = client - prod_df = client.list_product_dataset_mapping(catalog=catalog) + try: + prod_df = client.list_product_dataset_mapping(catalog=catalog) - if dataset.lower() in list(prod_df.dataset.str.lower()): - product = [prod_df[prod_df["dataset"].str.lower() == dataset.lower()]["product"].iloc[0]] - dataset_obj.product = product + if dataset.lower() in list(prod_df.dataset.str.lower()): + product = [prod_df[prod_df["dataset"].str.lower() == dataset.lower()]["product"].iloc[0]] + dataset_obj.product = product + except APIResponseError: + pass return dataset_obj @@ -473,7 +488,8 @@ def to_dict(self) -> dict[str, Any]: """ dataset_dict = {snake_to_camel(k): v for k, v in self.__dict__.items() if not k.startswith("_")} - + if "tags" in dataset_dict: + dataset_dict["tag"] = dataset_dict.pop("tags") return dataset_dict def create( diff --git a/py_src/fusion/fusion.py b/py_src/fusion/fusion.py index 8a1a22c2..d75432b7 100644 --- a/py_src/fusion/fusion.py +++ b/py_src/fusion/fusion.py @@ -17,8 +17,8 @@ import pandas as pd import pyarrow as pa -from rich.progress import Progress from tabulate import tabulate +from tqdm import tqdm from fusion.attributes import Attribute, Attributes from fusion.credentials import FusionCredentials @@ -38,6 +38,7 @@ csv_to_table, distribution_to_filename, distribution_to_url, + ensure_resources, file_name_to_url, get_default_fs, get_session, @@ -85,10 +86,7 @@ def _call_for_dataframe(url: str, session: requests.Session) -> pd.DataFrame: pandas.DataFrame: a dataframe containing the requested data. """ response_data = handle_paginated_request(session, url) - if "resources" not in response_data or not response_data["resources"]: - raise APIResponseError( - ValueError("No data found"), - ) + ensure_resources(response_data) ret_df = pd.DataFrame(response_data["resources"]).reset_index(drop=True) return ret_df @@ -198,7 +196,7 @@ def __repr__(self) -> str: for method_name in dir(Fusion) if callable(getattr(Fusion, method_name)) and not method_name.startswith("_") ] - + [ + + [ (getattr(Fusion, p).__doc__ or "").split("\n")[0] for p in dir(Fusion) if isinstance(getattr(Fusion, p), property) @@ -981,7 +979,7 @@ def download( # noqa: PLR0912, PLR0913, PLR0915 n_par = cpu_count(n_par) - download_spec: list[dict[str, Any]] = [ + download_spec: list[dict[str, Any]] = [ { "lfs": self.fs, "rpath": distribution_to_url( @@ -1022,22 +1020,23 @@ def download( # noqa: PLR0912, PLR0913, PLR0915 VERBOSE_LVL, f"Beginning {len(download_spec)} downloads in batches of {n_par}", ) + res = [None] * len(download_spec) + if show_progress: - with Progress() as p: - task = p.add_task("Downloading", total=len(download_spec)) - res = [] - for spec in download_spec: + with tqdm(total=len(download_spec), desc="Downloading") as p: + for i, spec in enumerate(download_spec): r = self.get_fusion_filesystem().download(**spec) - res.append(r) - p.update(task, advance=1) + res[i] = r + if r[0] is True: + p.update(1) else: res = [self.get_fusion_filesystem().download(**spec) for spec in download_spec] - if (len(res) > 0) and (not all(r[0] for r in res)): + if (len(res) > 0) and (not all(r[0] for r in res)): # type: ignore for r in res: if not r[0]: warnings.warn(f"The download of {r[1]} was not successful", stacklevel=2) - return res if return_paths else None + return res if return_paths else None # type: ignore def _validate_format( self, @@ -2290,22 +2289,24 @@ def attributes( attributes_obj.client = self return attributes_obj - def report_attribute( self, - title: str, - sourceIdentifier: str | None = None, + title: str | None = None, + id: int | None = None, # noqa: A002 + source_identifier: str | None = None, description: str | None = None, - technicalDataType: str | None = None, + technical_data_type: str | None = None, path: str | None = None, ) -> ReportAttribute: """Instantiate a ReportAttribute object with this client for metadata creation. Args: - title (str): The display title of the attribute (required). - sourceIdentifier (str | None, optional): A unique identifier or reference ID from the source system. + title (str | None, optional): The display title of the attribute. + id (int | None, optional): The unique identifier of the attribute. + id argument is not required for 'create' operation. + source_identifier (str | None, optional): A unique identifier or reference ID from the source system. description (str | None, optional): A longer description of the attribute. - technicalDataType (str | None, optional): The technical data type (e.g., string, int, boolean). + technical_data_type (str | None, optional): The technical data type (e.g., string, int, boolean). path (str | None, optional): The hierarchical path or logical grouping for the attribute. Returns: @@ -2315,17 +2316,18 @@ def report_attribute( >>> fusion = Fusion() >>> attr = fusion.report_attribute( ... title="Customer ID", - ... sourceIdentifier="cust_id_123", + ... source_identifier="cust_id_123", ... description="Unique customer identifier", - ... technicalDataType="String", + ... technical_data_type="String", ... path="Customer.Details" ... ) """ attribute_obj = ReportAttribute( - sourceIdentifier=sourceIdentifier, + source_identifier=source_identifier, title=title, + id=id, description=description, - technicalDataType=technicalDataType, + technical_data_type=technical_data_type, path=path, ) attribute_obj.client = self @@ -2371,13 +2373,20 @@ def reports(self) -> ReportsWrapper: ... frequency="Monthly", ... category="Risk", ... sub_category="Credit Risk", - ... data_node_id={"id": "node123"}, + ... business_domain="CDAO Office", ... regulatory_related=True, - ... domain={"id": "domain123"} + ... owner_node={"name": "APP-123", "type": "Application (SEAL)"}, + ... publisher_node={ + ... "name": "DASH-01", + ... "type": "Intelligent Solutions", + ... "publisher_node_identifier": "seal:app:APP-123" + ... }, ... ) + >>> new_report.create() """ return ReportsWrapper(client=self) + def delete_datasetmembers( self, dataset: str, @@ -2601,135 +2610,152 @@ def list_datasetmembers_distributions( members_df = pd.DataFrame(rows, columns=["identifier", "format"]) return members_df + def report( # noqa: PLR0913 self, - description: str, - title: str, - frequency: str, - category: str, - sub_category: str, - data_node_id: dict[str, str], - regulatory_related: bool, - domain: dict[str, str], - tier_type: str | None = None, + description: str | None = None, + title: str | None = None, + frequency: str | None = None, + category: str | None = None, + sub_category: str | None = None, + owner_node: dict[str, str] | None = None, + publisher_node: dict[str, Any] | None = None, + regulatory_related: bool | None = None, + business_domain: str | None = None, lob: str | None = None, - alternative_id: dict[str, str] | None = None, sub_lob: str | None = None, is_bcbs239_program: bool | None = None, risk_area: str | None = None, risk_stripe: str | None = None, sap_code: str | None = None, + source_system: dict[str, Any] | None = None, + id: str | None = None, # noqa: A002 **kwargs: Any, ) -> Report: """Instantiate a Report object with the current Fusion client attached. Args: - description (str): Description of the report. - title (str): Title of the report or process. - frequency (str): Reporting frequency (e.g., Monthly, Quarterly). - category (str): Main classification of the report. - sub_category (str): Sub-classification under the main category. - data_node_id (dict[str, str]): Associated data node details. Should include "name" and "dataNodeType". - regulatory_related (bool): Whether the report is regulatory-designated. This is a required field. - tier_type (str, optional): Tier classification (e.g., "Tier 1", "Non Tier 1"). - lob (str, optional): Line of business. - alternative_id (dict[str, str], optional): Alternate identifiers for the report. - sub_lob (str, optional): Subdivision of the line of business. - is_bcbs239_program (bool, optional): Whether the report is part of the BCBS 239 program. - risk_area (str, optional): Risk area covered by the report. - risk_stripe (str, optional): Stripe or classification under the risk area. - sap_code (str, optional): SAP financial tracking code. - domain (dict[str, str | bool], optional): Domain details. Typically contains a "name" key. - **kwargs (Any): Additional optional fields such as: - - tier_designation (str) - - region (str) - - mnpi_indicator (bool) - - country_of_reporting_obligation (str) - - primary_regulator (str) + description (str | None): Detailed Description of the report. + This is mandatory field for report creation. + title (str | None): Title (Display Name) of the report. + This is mandatory field for report creation. + frequency (str | None): Frequency of the report. + This is mandatory field for report creation. + category (str | None): Category of the report. + This is mandatory field for report creation. + sub_category (str | None): Sub-classification under the main category. + This is mandatory field for report creation. + business_domain (str): Business domain string. This field cannot be blank if provided. + This is mandatory field for report creation. + owner_node (dict[str, str] | None): Owner node associated with the report. + {"name","type"} for the owner node. + This is mandatory field for report creation. + publisher_node (dict[str, Any] | None): Publisher node associated with the report. + {"name","type"} (+ optional {"publisher_node_identifier"}). + regulatory_related (bool | None): Indicated whether the report is related to regulatory requirements. + This is mandatory field for report creation. + business_domain (str | None): Business domain string. This is mandatory field for report creation. + lob (str | None): Line of business. + sub_lob (str | None): Subdivision of the line of business. + is_bcbs239_program (bool | None): Indicates whether the report is associated with the BCBS 239 program. + risk_area (str | None): Risk area. + risk_stripe (str | None): Risk stripe. + sap_code (str | None): SAP code associated with the report. + source_system (dict[str, Any] | None): Source system details for the report. + id (str | None): Server-assigned report identifier (needed for update/patch/delete if already known). + **kwargs (Any): Returns: Report: A Report object ready for API upload or further manipulation. """ report_obj = Report( + id=id, title=title, description=description, frequency=frequency, category=category, sub_category=sub_category, - data_node_id=data_node_id, + business_domain=business_domain, regulatory_related=regulatory_related, - tier_type=tier_type, + owner_node=owner_node, + publisher_node=publisher_node, lob=lob, - alternative_id=alternative_id, sub_lob=sub_lob, is_bcbs239_program=is_bcbs239_program, risk_area=risk_area, risk_stripe=risk_stripe, sap_code=sap_code, - domain=domain, + source_system=source_system, **kwargs, ) report_obj.client = self return report_obj - + + def dataflow( # noqa: PLR0913 - self, - provider_node: dict[str, str] | None = None, - consumer_node: dict[str, str] | None = None, - description: str | None = None, - alternative_id: dict[str, Any] | None = None, - transport_type: str | None = None, - frequency: str | None = None, - start_time: str | None = None, - end_time: str | None = None, - boundary_sets: list[dict[str, Any]] | None = None, - data_assets: list[dict[str, Any]] | None = None, - id: str | None = None, # noqa: A002 - **kwargs: Any, - ) -> Dataflow: - """Instantiate a Dataflow object bound to this Fusion client. + self, + provider_node: dict[str, str] | None = None, + consumer_node: dict[str, str] | None = None, + description: str | None = None, + transport_type: str | None = None, + frequency: str | None = None, + start_time: str | None = None, + end_time: str | None = None, + datasets: list[dict[str, Any]] | None = None, + connection_type: str | None = None, + source_system: dict[str, Any] | None = None, + id: str | None = None, # noqa: A002 + **kwargs: Any, + ) -> Dataflow: + """Instantiate a Dataflow object with this client. You may instantiate with just an ``id`` (useful for ``update()``, ``update_fields()``, or ``delete()``); - however, **creating** a new data flow via ``create()`` requires valid provider/consumer nodes. + however, **creating** a new data flow via ``create()`` requires valid provider/consumer nodes and + a ``connection_type``. Args: provider_node (dict[str, str] | None, optional): - Provider/source node details. Expected keys: ``name`` and ``dataNodeType``. + Provider node of the dataflow. It must be distinct from the consumer node. Required for create(). + Keys: ``name``, ``type``. consumer_node (dict[str, str] | None, optional): - Consumer/target node details. Expected keys: ``name`` and ``dataNodeType``. + Consumer node of the dataflow. It must be distinct from the provider node. Required for create(). + Keys: ``name``, ``type``. description (str | None, optional): - Purpose/summary of the data flow (if provided, must not be blank). - alternative_id (dict[str, Any] | None, optional): - Alternative identifier object (e.g., ``{"value": "...", "domain": "...", "isSor": true}``). + Specifies the purpose of the data movement. transport_type (str | None, optional): - Transport mechanism (e.g., ``"API"``, ``"FILE TRANSFER"``, ``"SYNCHRONOUS MESSAGING"``). + Transport type frequency (str | None, optional): - Flow cadence (e.g., ``"DAILY"``, ``"WEEKLY"``, ``"MONTHLY"``, etc.). + Frequency of the data flow start_time (str | None, optional): - Scheduled start time (e.g., ``"HH:mm:ss"`` or ISO-8601 with zone). + Scheduled start time of the Dataflow. end_time (str | None, optional): - Scheduled end time (e.g., ``"HH:mm:ss"`` or ISO-8601 with zone). - boundary_sets (list[dict[str, Any]] | None, optional): - Boundary set objects associated with the flow. - data_assets (list[dict[str, Any]] | None, optional): - Related data asset objects. + Scheduled end time of the Dataflow. + datasets (list[dict[str, Any]] | None, optional): + Specifies a list of datasets involved in the data flow, requiring a visibility license for each. + Maximum limit is of 100 datasets per dataflow. + An error will be thrown if the list contains duplicate entries. Defaults to empty list. + connection_type (str | None, optional): + Connection type for the dataflow. + source_system (dict[str, Any] | None, optional): + Source System of the data flow. id (str | None, optional): Server-assigned identifier; required for ``update()``, ``update_fields()``, and ``delete()``. - **kwargs (Any): - Additional fields supported by the API; passed through to the Dataflow dataclass. + **kwargs (Any) Returns: - Dataflow: A Dataflow instance with this Fusion client attached. + Dataflow: A Dataflow instance with Fusion client attached. Examples: - Create a handle with full details (ready for ``create()``): + Create a handle ready for ``create()``: >>> flow = fusion.dataflow( - ... provider_node={"name": "CRM_DB", "dataNodeType": "Database"}, - ... consumer_node={"name": "DWH", "dataNodeType": "Database"}, + ... provider_node={"name": "CRM_DB", "type": "Database"}, + ... consumer_node={"name": "DWH", "type": "Database"}, ... description="CRM → DWH nightly load", ... frequency="DAILY", ... transport_type="API", + ... connection_type="Consumes From", + ... source_system={"system": "Airflow"}, ... ) Create a handle for an existing flow by ID (for update/delete): @@ -2738,16 +2764,16 @@ def dataflow( # noqa: PLR0913 >>> flow.delete() """ df_obj = Dataflow( - providerNode=provider_node, - consumerNode=consumer_node, + provider_node=provider_node, + consumer_node=consumer_node, description=description, - alternativeId=alternative_id, - transportType=transport_type, + transport_type=transport_type, frequency=frequency, - startTime=start_time, - endTime=end_time, - boundarySets=boundary_sets or [], - dataAssets=data_assets or [], + start_time=start_time, + end_time=end_time, + datasets=datasets or [], + connection_type=connection_type, + source_system=source_system, id=id, **kwargs, ) diff --git a/py_src/fusion/fusion_filesystem.py b/py_src/fusion/fusion_filesystem.py index 682acb29..720e248f 100644 --- a/py_src/fusion/fusion_filesystem.py +++ b/py_src/fusion/fusion_filesystem.py @@ -407,11 +407,11 @@ def _merge_all_data(all_data: dict[str, Any] | None, response_dict: dict[str, An return all_data def cat( - self, - url: str, - start: int | None = None, - end: int | None = None, - **kwargs: Any, + self, + url: str, + start: int | None = None, + end: int | None = None, + **kwargs: Any, ) -> Any: """Fetch paths' contents with pagination support. @@ -425,41 +425,47 @@ def cat( """ url = self._decorate_url(url) - all_data = None kw = kwargs.copy() headers = kw.get("headers", {}).copy() kw["headers"] = headers session = self.sync_session - info_result = self.info(url) - if isinstance(info_result, dict): - file_size = info_result.get("size", None) - elif isinstance(info_result, list) and info_result: - file_size = info_result[0].get("size", None) if isinstance(info_result[0], dict) else None - else: - file_size = None - range_start = start if start is not None else 0 - range_end = end if end is not None else file_size if file_size is not None else 0 - fusion_file = FusionFile(self, url, session=session, size=file_size, **kw) + range_end = end if end is not None else 2**63 - 1 + + fusion_file = FusionFile(self, url, session=session, **kw) + all_bytes = bytearray() + all_data = None + while True: out, resp_headers = fusion_file._fetch_range_with_headers(range_start, range_end) - response_dict = json.loads(out.decode("utf-8")) - all_data = self._merge_all_data(all_data, response_dict) + try: + # Try to decode as JSON (API response) + response_dict = json.loads(out.decode("utf-8")) + all_data = self._merge_all_data(all_data, response_dict) + is_json = True + except json.JSONDecodeError: + # Not JSON, treat as file content + all_bytes += out + is_json = False + next_token = resp_headers.get("x-jpmc-next-token") if not next_token: break headers["x-jpmc-next-token"] = next_token kw["headers"] = headers - - return json.dumps(all_data).encode("utf-8") + + if is_json: + return json.dumps(all_data, separators=(",", ":")).encode("utf-8") + else: + return bytes(all_bytes) async def _cat( - self, - url: str, - start: int | None = None, - end: int | None = None, - **kwargs: Any, + self, + url: str, + start: int | None = None, + end: int | None = None, + **kwargs: Any, ) -> Any: """Fetch paths' contents with pagination support (async). @@ -474,34 +480,39 @@ async def _cat( """ await self._async_startup() url = self._decorate_url(url) - all_data = None kw = kwargs.copy() headers = kw.get("headers", {}).copy() kw["headers"] = headers session = await self.set_session() - info_result = await self._info(url) - if isinstance(info_result, dict): - file_size = info_result.get("size", None) - elif isinstance(info_result, list) and info_result: - file_size = info_result[0].get("size", None) if isinstance(info_result[0], dict) else None - else: - file_size = None - range_start = start if start is not None else 0 - range_end = end if end is not None else file_size if file_size is not None else 0 - fusion_file = FusionFile(self, url, session=session, size=file_size, **kw) + range_end = end if end is not None else 2**63 - 1 + fusion_file = FusionFile(self, url, session=session, **kw) + all_bytes = bytearray() + all_data = None + while True: out, resp_headers = await fusion_file._async_fetch_range_with_headers(range_start, range_end) - response_dict = json.loads(out.decode("utf-8")) - all_data = self._merge_all_data(all_data, response_dict) + try: + # Try to decode as JSON (API response) + response_dict = json.loads(out.decode("utf-8")) + all_data = self._merge_all_data(all_data, response_dict) + is_json = True + except json.JSONDecodeError: + # Not JSON, treat as file content + all_bytes += out + is_json = False + next_token = resp_headers.get("x-jpmc-next-token") if not next_token: break headers["x-jpmc-next-token"] = next_token kw["headers"] = headers - return json.dumps(all_data).encode("utf-8") + if is_json: + return json.dumps(all_data, separators=(",", ":")).encode("utf-8") + else: + return bytes(all_bytes) async def _stream_file(self, url: str, chunk_size: int = 100) -> AsyncGenerator[bytes, None]: """Return an async stream to file at the given url. diff --git a/py_src/fusion/product.py b/py_src/fusion/product.py index c11fb9ad..83465ac5 100644 --- a/py_src/fusion/product.py +++ b/py_src/fusion/product.py @@ -13,6 +13,8 @@ _is_json, camel_to_snake, convert_date_format, + ensure_resources, + handle_paginated_request, make_bool, make_list, requests_raise_for_status, @@ -381,9 +383,10 @@ def from_catalog(self, catalog: str | None = None, client: Fusion | None = None) client = self._use_client(client) catalog = client._use_catalog(catalog) - resp = client.session.get(f"{client.root_url}catalogs/{catalog}/products") - requests_raise_for_status(resp) - list_products = resp.json()["resources"] + url = f"{client.root_url}catalogs/{catalog}/products" + resp = handle_paginated_request(client.session, url) + ensure_resources(resp) + list_products = resp["resources"] dict_ = [dict_ for dict_ in list_products if dict_["identifier"] == self.identifier][0] product_obj = Product._from_dict(dict_) product_obj.client = client diff --git a/py_src/fusion/report.py b/py_src/fusion/report.py index 6b1c8800..87a68315 100644 --- a/py_src/fusion/report.py +++ b/py_src/fusion/report.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +from contextlib import suppress from dataclasses import dataclass, field, fields from pathlib import Path from typing import TYPE_CHECKING, Any, TypedDict @@ -26,7 +27,6 @@ from fusion import Fusion - logger = logging.getLogger(__name__) @@ -36,51 +36,54 @@ class Report(metaclass=CamelCaseMeta): Fusion Report class for managing report metadata. Attributes: - title (str): Title of the report or process. - data_node_id (dict[str, str]): Identifier of the associated data node (e.g., name, dataNodeType). - description (str): Description of the report. - frequency (str): Reporting frequency (e.g., Monthly, Quarterly). - category (str): Primary category classification. - sub_category (str): Sub-category under the main category. - domain (dict[str, str]): Domain metadata (typically with a "name" key). - regulatory_related (bool): Whether the report is regulatory-related. - - lob (str, optional): Line of Business. - sub_lob (str, optional): Subdivision of the Line of Business. - tier_type (str, optional): Report's tier classification. - is_bcbs239_program (bool, optional): Flag indicating BCBS 239 program inclusion. - risk_stripe (str, optional): Stripe under risk category. - risk_area (str, optional): The area of risk addressed. - sap_code (str, optional): Associated SAP cost code. - tier_designation (str, optional): Tier designation (e.g., Tier 1, Non Tier 1). - alternative_id (dict[str, str], optional): Alternate report identifiers. - region (str, optional): Associated region. - mnpi_indicator (bool, optional): Whether report contains MNPI. - country_of_reporting_obligation (str, optional): Country of regulatory obligation. - primary_regulator (str, optional): Main regulatory authority. - - _client (Fusion, optional): Fusion client for making API calls (injected automatically). + id (str | None): Server-assigned report identifier. Required for update/patch/delete. + title (str | None): Title/ Display name of the report. + description (str | None): Description of the report. + frequency (str | None): Frequency of the report. + category (str | None): Primary category classification. + sub_category (str | None): Sub-category under the main category. + business_domain (str | None): Business domain string (e.g., "CDAO Office"). + regulatory_related (bool | None): Whether the report is regulatory-related. + + owner_node (dict[str, str] | None): Owner node with keys {"name", "type"}. + publisher_node (dict[str, Any] | None): Publisher node with keys {"name", "type"} and optional + {"publisher_node_identifier"}. + + source_system (dict[str, Any] | None): Source system object if provided. + + lob (str | None): Line of Business associated with the Report. + sub_lob (str | None): Subdivision of the Line of Business. + is_bcbs239_program (bool | None): Flag indicating BCBS 239 program inclusion. + risk_stripe (str | None): Stripe under risk category. + risk_area (str | None): The area of risk addressed. + sap_code (str | None): Associated SAP cost code. + tier_designation (str | None): Tier designation (e.g., Tier 1, Non Tier 1). + region (str | None): Associated region. + mnpi_indicator (bool | None): Whether report contains MNPI. + country_of_reporting_obligation (str | None): Country of regulatory obligation. + primary_regulator (str | None): Main regulatory authority. + + _client (Fusion | None): Fusion client for making API calls (injected automatically). """ - title: str - data_node_id: dict[str, str] - description: str - frequency: str - category: str - sub_category: str - domain: dict[str, str] - regulatory_related: bool - - # Optional fields + id: str | None = None + title: str | None = None + description: str | None = None + frequency: str | None = None + category: str | None = None + sub_category: str | None = None + business_domain: str | None = None + regulatory_related: bool | None = None + owner_node: dict[str, str] | None = None + publisher_node: dict[str, Any] | None = None + source_system: dict[str, Any] | None = None lob: str | None = None sub_lob: str | None = None - tier_type: str | None = None is_bcbs239_program: bool | None = None - risk_stripe: str | None = None risk_area: str | None = None + risk_stripe: str | None = None sap_code: str | None = None tier_designation: str | None = None - alternative_id: dict[str, str] | None = None region: str | None = None mnpi_indicator: bool | None = None country_of_reporting_obligation: str | None = None @@ -89,8 +92,24 @@ class Report(metaclass=CamelCaseMeta): _client: Fusion | None = field(init=False, repr=False, compare=False, default=None) def __post_init__(self) -> None: - self.title = tidy_string(self.title or "") - self.description = tidy_string(self.description or "") + """Normalize certain text fields after initialization.""" + self.title = tidy_string(self.title or "") if self.title is not None else None + self.description = tidy_string(self.description or "") if self.description is not None else None + for n in ( + "business_domain", + "lob", + "sub_lob", + "risk_area", + "risk_stripe", + "sap_code", + "tier_designation", + "region", + "country_of_reporting_obligation", + "primary_regulator", + ): + v = getattr(self, n, None) + if isinstance(v, str): + setattr(self, n, tidy_string(v)) def __getattr__(self, name: str) -> Any: snake_name = camel_to_snake(name) @@ -105,13 +124,16 @@ def __setattr__(self, name: str, value: Any) -> None: @property def client(self) -> Fusion | None: + """Returns the bound Fusion client""" return self._client @client.setter def client(self, client: Fusion | None) -> None: + """Bind a Fusion client to the Report instance.""" self._client = client def _use_client(self, client: Fusion | None) -> Fusion: + """Resolve the client to use.""" res = self._client if client is None else client if res is None: raise ValueError("A Fusion client object is required.") @@ -119,175 +141,132 @@ def _use_client(self, client: Fusion | None) -> Fusion: @classmethod def from_dict(cls: type[Report], data: dict[str, Any]) -> Report: - """Instantiate a Report object from a dictionary. + """Instantiate Report from a dict with light-touch key handling. - All fields defined in the class will be set. - If a field is missing from input, it will be set to None. + - Convert only top-level keys from camelCase → snake_case + - Collapse top-level empty strings "" → None + - Minimal nested fix: publisherNode.publisherNodeIdentifier → publisher_node["publisher_node_identifier"] + - Minimal bool normalization for isBCBS239Program / regulatoryRelated """ - - def normalize_value(val: Any) -> Any: - if isinstance(val, str) and val.strip() == "": - return None - return val - - def convert_keys(d: dict[str, Any]) -> dict[str, Any]: - converted = {} - for k, v in d.items(): - key = k if k == "isBCBS239Program" else camel_to_snake(k) - if isinstance(v, dict) and not isinstance(v, str): - converted[key] = convert_keys(v) - else: - converted[key] = normalize_value(v) - return converted - - converted_data = convert_keys(data) - - if "isBCBS239Program" in converted_data: - converted_data["isBCBS239Program"] = make_bool(converted_data["isBCBS239Program"]) - - valid_fields = {f.name for f in fields(cls)} - filtered_data = {k: v for k, v in converted_data.items() if k in valid_fields} - + # Top-level camelCase → snake_case + mapped: dict[str, Any] = {camel_to_snake(k): v for k, v in data.items()} + + # Collapse top-level "" → None + for k, v in list(mapped.items()): + if isinstance(v, str) and v.strip() == "": + mapped[k] = None + + # Targeted nested handling for publisherNodeIdentifier + pub = mapped.get("publisher_node") + if isinstance(pub, dict) and "publisherNodeIdentifier" in pub: + pub_copy = dict(pub) + pub_copy["publisher_node_identifier"] = pub_copy.pop("publisherNodeIdentifier") + mapped["publisher_node"] = pub_copy + + # Minimal bool normalization + if "is_bcbs239_program" in mapped: + mapped["is_bcbs239_program"] = make_bool(mapped["is_bcbs239_program"]) + if "regulatory_related" in mapped: + mapped["regulatory_related"] = make_bool(mapped["regulatory_related"]) + + # Keep only valid dataclass fields + allowed = {f.name for f in fields(cls)} + filtered = {k: v for k, v in mapped.items() if k in allowed} + + # Construct without calling __init__ so we can set fields directly, then run post-init report = cls.__new__(cls) - - for fieldsingle in fields(cls): - if fieldsingle.name in filtered_data: - setattr(report, fieldsingle.name, filtered_data[fieldsingle.name]) - else: - setattr(report, fieldsingle.name, None) - + for fdef in fields(cls): + setattr(report, fdef.name, filtered.get(fdef.name, None)) report.__post_init__() return report - def validate(self) -> None: - required_fields = ["title", "data_node_id", "category", "frequency", "description", "sub_category", "domain"] - missing = [f for f in required_fields if getattr(self, f, None) in [None, ""]] - if missing: - raise ValueError(f"Missing required fields in Report: {', '.join(missing)}") - def to_dict(self) -> dict[str, Any]: - """Convert the Report instance to a dictionary. - - Returns: - dict[str, Any]: Report metadata as a dictionary. - - Examples: - >>> from fusion import Fusion - >>> fusion = Fusion() - >>> report = fusion.report("report") - >>> report_dict = report.to_dict() - - """ - - report_dict = {} + """Convert the Report instance to a dictionary (camelCase top-level keys, minimal nesting changes).""" + payload: dict[str, Any] = {} for k, v in self.__dict__.items(): if k.startswith("_"): continue - if k == "is_bcbs239_program": - report_dict["isBCBS239Program"] = v - elif k == "regulatory_related": - report_dict["regulatoryRelated"] = v - else: - report_dict[snake_to_camel(k)] = v - return report_dict + # Special-case BCBS field, otherwise camelCase the top-level key + out_key = "isBCBS239Program" if k == "is_bcbs239_program" else snake_to_camel(k) - @classmethod - def map_application_type(cls, app_type: str) -> str | None: - """Map application types to enum values.""" - mapping = { - "Application (SEAL)": "Application (SEAL)", - "Intelligent Solutions": "Intelligent Solutions", - "User Tool": "User Tool", - } - return mapping.get(app_type) + # Minimal nested handling: only fix publisher_node_identifier inside publisher_node + if k == "publisher_node" and isinstance(v, dict): + node = dict(v) # shallow copy + if "publisher_node_identifier" in node: + node["publisherNodeIdentifier"] = node.pop("publisher_node_identifier") + payload[out_key] = node + else: + payload[out_key] = v - @classmethod - def map_tier_type(cls, tier_type: str) -> str | None: - """Map tier types to enum values.""" - tier_mapping = {"Tier 1": "Tier 1", "Non Tier 1": "Non Tier 1"} - return tier_mapping.get(tier_type) + return payload + + @staticmethod + def _str_or_none(raw: Any) -> str | None: + """Return string form or None, collapsing floats like 5.0 -> '5'.""" + if raw is None: + return None + if isinstance(raw, float) and raw.is_integer(): + return str(int(raw)) + return str(raw) @classmethod def from_dataframe(cls, data: pd.DataFrame, client: Fusion | None = None) -> list[Report]: - """ - Create a list of Report objects from a DataFrame, applying permanent column mapping. - - Args: - data (pd.DataFrame): DataFrame containing report data. - - Returns: - list[Report]: List of Report objects. - """ - # Apply permanent column mapping - data = data.rename(columns=Report.COLUMN_MAPPING) # type: ignore[attr-defined] - data = data.replace([np.nan, np.inf, -np.inf], None) # Replace NaN, inf, -inf with None - - # Replace NaN with None - data = data.where(data.notna(), None) - - # Process each row and create Report objects - reports = [] - for _, row in data.iterrows(): - report_data = row.to_dict() - - # Handle nested fields like domain and data_node_id - report_data["domain"] = {"name": report_data.pop("domain_name", None)} # Populate "name" inside "domain" - raw_value = report_data.pop("data_node_name", None) - - if raw_value is None: - name_str = None - elif isinstance(raw_value, float) and raw_value.is_integer(): - name_str = str(int(raw_value)) # convert 2679.0 → "2679" - else: - name_str = str(raw_value) + """Create a list of Report objects from a DataFrame, applying permanent column mapping.""" + df_df = data.rename(columns=Report.COLUMN_MAPPING) # type: ignore[attr-defined] + df_df = df_df.replace([np.nan, np.inf, -np.inf], None) + df_df = df_df.where(df_df.notna(), None) + + reports: list[Report] = [] + for _, row in df_df.iterrows(): + report_data: dict[str, Any] = row.to_dict() + + def build_node(d: dict[str, Any], name_key: str, type_key: str) -> dict[str, Any] | None: + name_val = Report._str_or_none(d.pop(name_key, None)) + type_val = d.pop(type_key, None) + if name_val or type_val: + return {"name": name_val or "", "type": type_val or ""} + return None - report_data["data_node_id"] = { - "name": name_str, - "dataNodeType": cls.map_application_type(report_data.pop("data_node_type", None)), - } + publisher_node = build_node(report_data, "publisher_node_name", "publisher_node_type") + owner_node = build_node(report_data, "owner_node_name", "owner_node_type") - # Convert boolean fields - is_bcbs = report_data.get("is_bcbs239_program") - report_data["is_bcbs239_program"] = is_bcbs == "Yes" if is_bcbs else None + pub_ident = Report._str_or_none(report_data.pop("publisher_node_identifier", None)) + if pub_ident: + if publisher_node is None: + publisher_node = {"name": "", "type": "", "publisher_node_identifier": pub_ident} + else: + publisher_node["publisher_node_identifier"] = pub_ident - mnpi = report_data.get("mnpi_indicator") - report_data["mnpi_indicator"] = mnpi == "Yes" if mnpi else None + report_data["owner_node"] = owner_node + report_data["publisher_node"] = publisher_node - reg_related = report_data.get("regulatory_related") - report_data["regulatory_related"] = reg_related == "Yes" if reg_related else None + for key in ("is_bcbs239_program", "mnpi_indicator", "regulatory_related"): + val = report_data.get(key) + if isinstance(val, str): + low = val.strip().lower() + if low == "yes": + report_data[key] = True + elif low == "no": + report_data[key] = False - # Map tier designation - tier_val = report_data.get("tier_designation") - report_data["tier_designation"] = cls.map_tier_type(tier_val) if tier_val else None - # Filter out any fields not defined in the class - # This ensures that only valid fields are passed to the Report constructor valid_fields = {f.name for f in fields(cls)} report_data = {k: v for k, v in report_data.items() if k in valid_fields} report_obj = cls(**report_data) - report_obj.client = client # Attach the client if provided + report_obj.client = client try: report_obj.validate() reports.append(report_obj) except ValueError as e: - logger.warning(f"Skipping invalid row: {e}") + logger.warning("Skipping invalid row: %s", e) return reports @classmethod def from_csv(cls, file_path: str, client: Fusion | None = None) -> list[Report]: - """ - Create a list of Report objects from a CSV file, applying permanent column mapping. - - Args: - file_path (str): Path to the CSV file. - client (Any, optional): Client instance to attach to each Report. - - Returns: - list[Report]: List of Report objects. - """ + """Create a list of Report objects from a CSV file, applying permanent column mapping.""" data = pd.read_csv(file_path) return cls.from_dataframe(data, client=client) @@ -302,12 +281,12 @@ def from_object(cls, source: pd.DataFrame | list[dict[str, Any]] | str, client: return Reports(cls.from_dataframe(df, client=client)) elif isinstance(source, str): - if source.strip().endswith(".csv"): + s = source.strip() + if s.endswith(".csv"): return Reports(cls.from_csv(source, client=client)) - elif source.strip().startswith("[{"): + elif s.startswith("[{"): import json - - data = json.loads(source) + data = json.loads(s) df = pd.DataFrame(data) # noqa return Reports(cls.from_dataframe(df, client=client)) else: @@ -315,42 +294,105 @@ def from_object(cls, source: pd.DataFrame | list[dict[str, Any]] | str, client: raise TypeError("source must be a DataFrame, list of dicts, or string (.csv path or JSON)") + def validate(self) -> None: + """Validate presence of required fields and node sub-keys.""" + required_fields = [ + "title", + "description", + "frequency", + "category", + "sub_category", + "business_domain", + "regulatory_related", + ] + missing = [f for f in required_fields if getattr(self, f, None) in (None, "")] + if not (self.owner_node and self.owner_node.get("name") and self.owner_node.get("type")): + missing.append("owner_node.name/type") + if not (self.publisher_node and self.publisher_node.get("name") and self.publisher_node.get("type")): + missing.append("publisher_node.name/type") + + if missing: + raise ValueError(f"Missing required fields in Report: {', '.join(missing)}") + def create( self, client: Fusion | None = None, return_resp_obj: bool = False, ) -> requests.Response | None: - """Upload a new report to a Fusion catalog. + """Upload a new report to a Fusion catalog.""" + client = self._use_client(client) + payload = self.to_dict() - Args: - client (Fusion, optional): A Fusion client object. Defaults to the instance's _client. - If instantiated from a Fusion object, then the client is set automatically. - return_resp_obj (bool, optional): If True then return the response object. Defaults to False. + def _strip_ids(x: Any) -> Any: + if isinstance(x, dict): + return {k: _strip_ids(v) for k, v in x.items() if k != "id"} + if isinstance(x, list): + return [_strip_ids(i) for i in x] + return x - Returns: - requests.Response | None: The response object from the API call if return_resp_obj is True, otherwise None. + payload = _strip_ids(payload) + payload.pop("id", None) - Examples: + url = f"{client._get_new_root_url()}/api/corelineage-service/v1/reports" + resp: requests.Response = client.session.post(url, json=payload) + requests_raise_for_status(resp) + with suppress(Exception): + self.id = resp.json().get("id", self.id) + return resp if return_resp_obj else None - >>> from fusion import Fusion - >>> fusion = Fusion() - >>> report = fusion.report( - ... name="report_1", - ... title="Quarterly Risk Report", - ... category="Risk", - ... frequency="Quarterly", - ... ) - >>> report.create() + def update( + self, + client: Fusion | None = None, + return_resp_obj: bool = False, + ) -> requests.Response | None: + """Update this report with the current object state.""" + client = self._use_client(client) + if not self.id: + raise ValueError("Report ID is required on the object (set self.id before update()).") + + payload = self.to_dict() + payload.pop("id", None) + payload.pop("title", None) # title immutable for PUT + + url = f"{client._get_new_root_url()}/api/corelineage-service/v1/reports/{self.id}" + resp: requests.Response = client.session.put(url, json=payload) + requests_raise_for_status(resp) + return resp if return_resp_obj else None + def update_fields( + self, + client: Fusion | None = None, + return_resp_obj: bool = False, + ) -> requests.Response | None: + """Partially update this report (PATCH) using the current object state. + + This sends the current field values of the instance (except `id`) as a PATCH. + Set attributes you want to change on the object, then call `update_fields()`. """ client = self._use_client(client) + if not self.id: + raise ValueError("Report ID is required on the object (set self.id before patch()).") - data = self.to_dict() + payload = self.to_dict() + payload.pop("id", None) - url = f"{client._get_new_root_url()}/api/corelineage-service/v1/reports" - resp: requests.Response = client.session.post(url, json=data) + url = f"{client._get_new_root_url()}/api/corelineage-service/v1/reports/{self.id}" + resp: requests.Response = client.session.patch(url, json=payload) requests_raise_for_status(resp) + return resp if return_resp_obj else None + def delete( + self, + client: Fusion | None = None, + return_resp_obj: bool = False, + ) -> requests.Response | None: + """Delete this report using self.id.""" + client = self._use_client(client) + if not self.id: + raise ValueError("Report ID is required on the object (set self.id before delete()).") + url = f"{client._get_new_root_url()}/api/corelineage-service/v1/reports/{self.id}" + resp: requests.Response = client.session.delete(url) + requests_raise_for_status(resp) return resp if return_resp_obj else None class AttributeTermMapping(TypedDict): @@ -366,54 +408,46 @@ def link_attributes_to_terms( client: Fusion, return_resp_obj: bool = False, ) -> requests.Response | None: - """ - Class method to link attributes to business terms for a report. - - Can be called without an actual Report object. - - Args: - report_id (str): ID of the report to link terms to. - mappings (list): List of attribute-to-term mappings. - client (Fusion): Fusion client instance. - return_resp_obj (bool): Whether to return the raw response object. - - Returns: - requests.Response | None: API response - """ + """Link attributes to business terms for a report.""" url = ( - f"{client._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/reportElements/businessTerms" # noqa: E501 + f"{client._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/reportElements/businessTerms" ) resp = client.session.post(url, json=mappings) requests_raise_for_status(resp) - return resp if return_resp_obj else None Report.COLUMN_MAPPING = { # type: ignore[attr-defined] "Report/Process Name": "title", "Report/Process Description": "description", - "Activity Type": "tier_type", "Frequency": "frequency", "Category": "category", - "Report/Process Owner SID": "report_owner", "Sub Category": "sub_category", + "Regulatory Designated": "regulatory_related", + "businessDomain": "business_domain", + "CDO Office": "business_domain", + "ownerNode_name": "owner_node_name", + "ownerNode_type": "owner_node_type", + "publisherNode_name": "publisher_node_name", + "publisherNode_type": "publisher_node_type", + "publisherNode_publisherNodeIdentifier": "publisher_node_identifier", + "sourceSystem": "source_system", "LOB": "lob", "Sub-LOB": "sub_lob", "JPMSE BCBS Related": "is_bcbs239_program", "Report Type": "risk_stripe", + "Risk Area": "risk_area", "Tier Type": "tier_designation", "Region": "region", "MNPI Indicator": "mnpi_indicator", "Country of Reporting Obligation": "country_of_reporting_obligation", - "Regulatory Designated": "regulatory_related", "Primary Regulator": "primary_regulator", - "CDO Office": "domain_name", - "Application ID": "data_node_name", - "Application Type": "data_node_type", } class Reports: + """Container for a list of Report objects with convenience loaders.""" + def __init__(self, reports: list[Report] | None = None) -> None: self.reports = reports or [] self._client: Fusion | None = None @@ -429,21 +463,25 @@ def __len__(self) -> int: @property def client(self) -> Fusion | None: + """Return the bound client for all contained reports (if any).""" return self._client @client.setter def client(self, client: Fusion | None) -> None: + """Bind a client to this collection and all contained reports.""" self._client = client for report in self.reports: report.client = client @classmethod def from_csv(cls, file_path: str, client: Fusion | None = None) -> Reports: + """Load Reports from a CSV file path.""" data = pd.read_csv(file_path) return cls.from_dataframe(data, client=client) @classmethod def from_dataframe(cls, df: pd.DataFrame, client: Fusion | None = None) -> Reports: + """Load Reports from a pandas DataFrame.""" report_objs = Report.from_dataframe(df, client=client) obj = cls(report_objs) obj.client = client @@ -467,7 +505,6 @@ def from_object(cls, source: pd.DataFrame | list[dict[str, Any]] | str, client: elif isinstance(source, str): if source.lower().endswith(".csv") and Path(source).exists(): return cls.from_csv(source, client=client) - elif source.strip().startswith("[{"): dict_list = json.loads(source) return cls.from_dataframe(pd.DataFrame(dict_list), client=client) diff --git a/py_src/fusion/report_attributes.py b/py_src/fusion/report_attributes.py index 2e54f950..784f402d 100644 --- a/py_src/fusion/report_attributes.py +++ b/py_src/fusion/report_attributes.py @@ -19,10 +19,11 @@ @dataclass class ReportAttribute(metaclass=CamelCaseMeta): - title: str - sourceIdentifier: str | None = None + title: str | None = None + id: int | None = None + source_identifier: str | None = None description: str | None = None - technicalDataType: str | None = None + technical_data_type: str | None = None path: str | None = None _client: Fusion | None = field(init=False, repr=False, compare=False, default=None) @@ -57,10 +58,11 @@ def client(self, client: Fusion | None) -> None: def to_dict(self) -> dict[str, Any]: return { - "sourceIdentifier": self.sourceIdentifier, + "id": self.id, + "sourceIdentifier": self.source_identifier, "title": self.title, "description": self.description, - "technicalDataType": self.technicalDataType, + "technicalDataType": self.technical_data_type, "path": self.path, } @@ -110,7 +112,7 @@ def remove_attribute(self, name: str) -> bool: def get_attribute(self, name: str) -> ReportAttribute | None: for attr in self.attributes: - if attr.name == name: + if attr.title == name: return attr return None @@ -136,7 +138,7 @@ def from_csv(self, file_path: str) -> ReportAttributes: # Only keep relevant columns column_map = { - "Local Data Element Reference ID": "sourceIdentifier", + "Local Data Element Reference ID": "source_identifier", "Data Element Name": "title", "Data Element Description": "description", } @@ -148,7 +150,7 @@ def from_csv(self, file_path: str) -> ReportAttributes: df = df.rename(columns=column_map) # noqa # Add any missing required fields with default None - for col in ["technicalDataType", "path"]: + for col in ["technical_data_type", "path"]: if col not in df: df[col] = None @@ -195,14 +197,77 @@ def to_dataframe(self) -> pd.DataFrame: data = [attr.to_dict() for attr in self.attributes] return pd.DataFrame(data) + def _build_api_url(self, client: Fusion, report_id: str) -> str: + """This is a private method, use it to build the API URL for report attributes operations.""" + return f"{client._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/report-elements" + + def _build_payload(self, operation: str) -> list[dict[str, Any]]: + """This is a private method to build payload for different CRUD operations with appropriate field exclusions. + + Args: + operation (str): The operation type ('create', 'update', 'update_fields', 'delete') + + Returns: + list[dict[str, Any]]: Payload with appropriate fields for the operation + """ + payload = [] + + for attr in self.attributes: + if operation == "delete": + payload.append({"id": attr.id}) + elif operation == "create": + attr_dict = { + field_name: field_value + for field_name, field_value in attr.to_dict().items() + if field_value is not None and field_name != "id" + } + payload.append(attr_dict) + elif operation == "update_fields": + attr_dict = { + field_name: field_value + for field_name, field_value in attr.to_dict().items() + if field_value is not None and field_name != "title" + } + payload.append(attr_dict) + elif operation == "update": + attr_dict = attr.to_dict() + attr_dict.pop("title", None) + payload.append(attr_dict) + + return payload + + def _send_request( + self, + client: Fusion, + url: str, + payload: list[dict[str, Any]] | None = None, + method: str = "GET", + return_resp_obj: bool = False, + ) -> requests.Response | None: + """This is a private method, use it to send HTTP request and handle response.""" + if method.upper() == "GET": + resp = client.session.get(url) + elif method.upper() == "POST": + resp = client.session.post(url, json=payload) + elif method.upper() == "PUT": + resp = client.session.put(url, json=payload) + elif method.upper() == "PATCH": + resp = client.session.patch(url, json=payload) + elif method.upper() == "DELETE": + resp = client.session.delete(url, json=payload) + else: + raise ValueError(f"Unsupported HTTP method: {method}") + + requests_raise_for_status(resp) + return resp if return_resp_obj else None + def create( self, report_id: str, client: Fusion | None = None, return_resp_obj: bool = False, ) -> requests.Response | None: - """ - Create the ReportAttributes to the core-lineage API. + """Create the ReportAttributes. Args: report_id (str): The identifier of the report. @@ -211,14 +276,154 @@ def create( Returns: requests.Response | None: API response object if return_resp_obj is True. + + Example: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> attr = fusion.report_attribute( + ... title="Revenue", + ... source_identifier="rev_001", + ... description="Revenue field for reporting", + ... technical_data_type="decimal", + ... path="/data/revenue" + ... ) + >>> report_attrs = fusion.report_attributes([attr]) + >>> report_attrs.create(report_id="report_1") """ client = self._use_client(client) + url = self._build_api_url(client, report_id) + payload = self._build_payload("create") - url = f"{client._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/reportElements" + return self._send_request(client, url, payload, "POST", return_resp_obj) - payload = [attr.to_dict() for attr in self.attributes] + def update( + self, + report_id: str, + client: Fusion | None = None, + return_resp_obj: bool = False, + ) -> requests.Response | None: + """Replace report attributes. - resp = client.session.post(url, json=payload) - requests_raise_for_status(resp) + This method performs a complete replacement of each report attribute. + Any properties not specified will be assigned null or default values. + Note: title is immutable and cannot be modified. - return resp if return_resp_obj else None + Args: + report_id (str): The identifier of the report. + client (Fusion, optional): Fusion client for auth and config. Uses self._client if not passed. + return_resp_obj (bool, optional): If True, returns the response object. Otherwise, returns None. + + Returns: + requests.Response | None: API response object if return_resp_obj is True. + + Raises: + ValueError: If required fields are missing or invalid. + + Example: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> attr = fusion.report_attribute( + ... id=456, + ... source_identifier="rev_001", + ... description="Updated revenue field", + ... technical_data_type="decimal" + ... ) + >>> report_attrs = fusion.report_attributes([attr]) + >>> report_attrs.update(report_id="report_1") + + """ + client = self._use_client(client) + for attr in self.attributes: + if attr.id is None: + raise ValueError(f"ReportAttribute must have an 'id' field for update") + + url = self._build_api_url(client, report_id) + payload = self._build_payload("update") + return self._send_request(client, url, payload, "PUT", return_resp_obj) + + def update_fields( + self, + report_id: str, + client: Fusion | None = None, + return_resp_obj: bool = False, + ) -> requests.Response | None: + """Update specific fields of report attributes. + + This method performs a partial update of each report attribute. + Only the specified properties will be updated while all other properties remain unchanged. + Note: title is immutable and cannot be modified. At least one property must be provided. + + Args: + report_id (str): The identifier of the report. + client (Fusion, optional): Fusion client for auth and config. Uses self._client if not passed. + return_resp_obj (bool, optional): If True, returns the response object. Otherwise, returns None. + + Returns: + requests.Response | None: API response object if return_resp_obj is True. + + Raises: + ValueError: If required fields are missing or invalid, or if no updatable fields are provided. + + Example: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> attr = fusion.report_attribute( + ... id=456, + ... description="Updated revenue field" # only this will be updated + ... ) + >>> report_attrs = fusion.report_attributes([attr]) + >>> report_attrs.update_fields(report_id="report_1") + + """ + client = self._use_client(client) + for attr in self.attributes: + if attr.id is None: + raise ValueError(f"ReportAttribute must have an 'id' field for update_fields") + + url = self._build_api_url(client, report_id) + payload = self._build_payload("update_fields") + return self._send_request(client, url, payload, "PATCH", return_resp_obj) + + def delete( + self, + report_id: str, + client: Fusion | None = None, + return_resp_obj: bool = False, + ) -> requests.Response | None: + """Soft delete report attributes. + + This method performs a soft delete of each report attribute. + Once soft deleted, the report attributes can still be viewed but cannot be modified. + Note: Throws an error if report attribute is already deleted. + + Args: + report_id (str): The identifier of the report. + client (Fusion, optional): Fusion client for auth and config. Uses self._client if not passed. + return_resp_obj (bool, optional): If True, returns the response object. Otherwise, returns None. + + Returns: + requests.Response | None: API response object if return_resp_obj is True. + + Raises: + ValueError: If required fields are missing or if attributes are already deleted. + + Example: + >>> from fusion import Fusion + >>> fusion = Fusion() + >>> attr = fusion.report_attribute( + ... id=456 # only id is needed for deletion + ... ) + >>> report_attrs = fusion.report_attributes([attr]) + >>> report_attrs.delete(report_id="report_1") + + """ + client = self._use_client(client) + + for attr in self.attributes: + if attr.id is None: + raise ValueError(f"ReportAttribute must have an 'id' field for deletion") + + url = self._build_api_url(client, report_id) + payload = self._build_payload("delete") + + return self._send_request(client, url, payload, "DELETE", return_resp_obj) diff --git a/py_src/fusion/utils.py b/py_src/fusion/utils.py index 528ea00d..6bf5a80d 100644 --- a/py_src/fusion/utils.py +++ b/py_src/fusion/utils.py @@ -40,6 +40,8 @@ ) from urllib3.util.retry import Retry +from fusion.exceptions import APIResponseError + from .authentication import FusionAiohttpSession, FusionOAuthAdapter if TYPE_CHECKING: @@ -541,9 +543,9 @@ def distribution_to_filename( if file_name and file_name != file_format: # Use explicit filename directly final_name = f"{file_name}.{file_format}" - elif partitioning == "hive": + elif partitioning == "hive": final_name = f"{dataset}.{file_format}" - else: + else: final_name = f"{dataset}__{catalog}__{datasetseries}.{file_format}" sep = "/" @@ -596,7 +598,7 @@ def distribution_to_url( return f"{root_url}catalogs/{catalog}/datasets/{dataset}/sample/distributions/{file_format}" if is_download: return ( - f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/" + f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/" f"{datasetseries}/distributions/{file_format}/files/operationType/download?file={file_name}" ) return f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{datasetseries}/distributions/{file_format}" @@ -1025,3 +1027,9 @@ def _merge_responses(responses: list[dict[str, Any]]) -> dict[str, Any]: merged[key].extend(response[key]) return merged + + +def ensure_resources(resp: dict[str, Any]) -> None: + """Raise APIResponseError if 'resources' key is missing or empty in the response.""" + if "resources" not in resp or not resp["resources"]: + raise APIResponseError(ValueError("No data found")) diff --git a/py_tests/test_dataflow.py b/py_tests/test_dataflow.py index b411bc6f..2c5a4ba0 100644 --- a/py_tests/test_dataflow.py +++ b/py_tests/test_dataflow.py @@ -9,12 +9,13 @@ def test_dataflow_basic_fields() -> None: flow = Dataflow( - providerNode={"name": "CRM_DB", "nodeType": "Database"}, - consumerNode={"name": "DWH", "nodeType": "Database"}, + provider_node={"name": "CRM_DB", "type": "Database"}, + consumer_node={"name": "DWH", "type": "Database"}, description="CRM to DWH load", - transportType="API", + transport_type="API", frequency="DAILY", ) + # camelCase attribute access works assert flow.providerNode is not None assert flow.consumerNode is not None assert flow.providerNode["name"] == "CRM_DB" @@ -25,22 +26,22 @@ def test_dataflow_basic_fields() -> None: def test_dataflow_to_dict() -> None: flow = Dataflow( - providerNode={"name": "S3", "nodeType": "Storage"}, - consumerNode={"name": "Analytics", "nodeType": "Dashboard"}, + provider_node={"name": "S3", "type": "Storage"}, + consumer_node={"name": "Analytics", "type": "Dashboard"}, description="S3 to Analytics feed", - transportType="FILE TRANSFER", + transport_type="FILE TRANSFER", frequency="WEEKLY", ) result = flow.to_dict() assert result["providerNode"]["name"] == "S3" - assert result["consumerNode"]["nodeType"] == "Dashboard" + assert result["consumerNode"]["type"] == "Dashboard" assert result["frequency"] == "WEEKLY" def test_dataflow_from_dict() -> None: data = { - "providerNode": {"name": "App1", "nodeType": "User Tool"}, - "consumerNode": {"name": "DWH", "nodeType": "Database"}, + "providerNode": {"name": "App1", "type": "User Tool"}, + "consumerNode": {"name": "DWH", "type": "Database"}, "description": "Dict-based dataflow", "transportType": "API", "frequency": "DAILY", @@ -55,8 +56,8 @@ def test_dataflow_from_dict() -> None: def test_dataflow_from_object_series() -> None: series = pd.Series( { - "providerNode": {"name": "CRM_DB", "nodeType": "Database"}, - "consumerNode": {"name": "DWH", "nodeType": "Database"}, + "providerNode": {"name": "CRM_DB", "type": "Database"}, + "consumerNode": {"name": "DWH", "type": "Database"}, "description": "Series-based dataflow", "transportType": "API", "frequency": "DAILY", @@ -72,8 +73,8 @@ def test_dataflow_from_object_series() -> None: def test_dataflow_from_object_json() -> None: json_str = """{ - "providerNode": {"name": "SystemA", "nodeType": "App"}, - "consumerNode": {"name": "SystemB", "nodeType": "Database"}, + "providerNode": {"name": "SystemA", "type": "App"}, + "consumerNode": {"name": "SystemB", "type": "Database"}, "description": "JSON-based dataflow", "frequency": "MONTHLY" }""" @@ -88,15 +89,15 @@ def test_dataflow_from_dataframe(fusion_obj: Fusion) -> None: frame = pd.DataFrame( [ { - "providerNode": {"name": "CRM_DB", "nodeType": "Database"}, - "consumerNode": {"name": "DWH", "nodeType": "Database"}, + "providerNode": {"name": "CRM_DB", "type": "Database"}, + "consumerNode": {"name": "DWH", "type": "Database"}, "description": "Row1", "transportType": "API", "frequency": "DAILY", }, { - "providerNode": {"name": "S3", "nodeType": "Storage"}, - "consumerNode": {"name": "Analytics", "nodeType": "Dashboard"}, + "providerNode": {"name": "S3", "type": "Storage"}, + "consumerNode": {"name": "Analytics", "type": "Dashboard"}, "description": "Row2", "transportType": "FILE TRANSFER", "frequency": "WEEKLY", @@ -105,23 +106,78 @@ def test_dataflow_from_dataframe(fusion_obj: Fusion) -> None: ) flows = Dataflow.from_dataframe(frame, client=fusion_obj) assert isinstance(flows, list) - test_value = 2 - assert len(flows) == test_value + num = 2 + assert len(flows) == num assert all(isinstance(f, Dataflow) for f in flows) assert flows[0].description == "Row1" assert flows[1].frequency == "WEEKLY" - + def test_dataflow_validate_nodes_for_create_passes() -> None: flow = Dataflow( - providerNode={"name": "CRM_DB", "nodeType": "Database"}, - consumerNode={"name": "DWH", "nodeType": "Database"}, + provider_node={"name": "CRM_DB", "type": "Database"}, + consumer_node={"name": "DWH", "type": "Database"}, + connection_type="API", # required for create-time validation ) # should not raise flow._validate_nodes_for_create() def test_dataflow_validate_nodes_for_create_raises() -> None: - flow = Dataflow(providerNode=None, consumerNode=None) + flow = Dataflow(provider_node=None, consumer_node=None) with pytest.raises(ValueError, match="must be a dict"): flow._validate_nodes_for_create() + + +def test_dataflow_to_dict_drop_none_false_includes_nulls() -> None: + """to_dict with drop_none=False should keep None values and defaults.""" + flow = Dataflow( + provider_node={"name": "SRC", "type": "Database"}, + consumer_node={"name": "DST", "type": "Database"}, + description=None, # kept because drop_none=False + id=None, # kept because drop_none=False + frequency="DAILY", + ) + out = flow.to_dict(drop_none=False) # ← no exclude, so no CI brittleness + # required keys present + assert "providerNode" in out + assert "consumerNode" in out + # None-valued fields are retained + assert "description" in out + assert out["description"] is None + assert "id" in out + assert out["id"] is None + # defaulted list field should be present + assert "datasets" in out + assert isinstance(out["datasets"], list) + # a normal field is preserved + assert out["frequency"] == "DAILY" + + +def test_dataflow_from_dataframe_skips_invalid_rows_and_sets_client(fusion_obj: Fusion) -> None: + """from_dataframe should skip invalid rows and attach the provided client to valid ones.""" + frame = pd.DataFrame( + [ + # invalid: missing consumerNode + { + "providerNode": {"name": "OnlyProvider", "type": "Database"}, + "description": "Invalid row", + "frequency": "DAILY", + }, + # valid + { + "providerNode": {"name": "SRC", "type": "Database"}, + "consumerNode": {"name": "DST", "type": "Database"}, + "description": "Valid row", + "transportType": "API", + "frequency": "WEEKLY", + }, + ] + ) + flows = Dataflow.from_dataframe(frame, client=fusion_obj) + assert isinstance(flows, list) + assert len(flows) == 1 # invalid row skipped + assert isinstance(flows[0], Dataflow) + assert flows[0].description == "Valid row" + # client should be attached + assert flows[0].client is fusion_obj diff --git a/py_tests/test_dataset.py b/py_tests/test_dataset.py index 918bc647..651ca4e5 100644 --- a/py_tests/test_dataset.py +++ b/py_tests/test_dataset.py @@ -925,6 +925,76 @@ def test_dataset_class_from_catalog_no_product(requests_mock: requests_mock.Mock assert my_dataset.is_active is False +def test_dataset_class_from_catalog_empty_product_datasets( + requests_mock: requests_mock.Mocker, fusion_obj: Fusion +) -> None: + """Test Dataset from_catalog method when catalog has no products.""" + catalog = "my_catalog" + url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets" + + expected_data = { + "resources": [ + { + "catalog": { + "@id": "my_catalog/", + "description": "my catalog", + "title": "my catalog", + "identifier": "my_catalog", + }, + "title": "Test Dataset", + "identifier": "TEST_DATASET", + "category": ["category"], + "shortAbstract": "short abstract", + "description": "description", + "frequency": "Once", + "isInternalOnlyDataset": False, + "isThirdPartyData": True, + "isRestricted": False, + "isRawData": True, + "maintainer": "maintainer", + "source": "source", + "region": ["region"], + "publisher": "publisher", + "subCategory": ["subCategory"], + "tags": ["tag1", "tag2"], + "createdDate": "2020-05-05", + "modifiedDate": "2020-05-05", + "deliveryChannel": ["API"], + "language": "English", + "status": "Available", + "type": "Source", + "containerType": "Snapshot-Full", + "snowflake": "snowflake", + "complexity": "complexity", + "isImmutable": False, + "isMnpi": False, + "isPii": False, + "isPci": False, + "isClient": False, + "isPublic": False, + "isInternal": False, + "isConfidential": False, + "isHighlyConfidential": False, + "isActive": False, + "@id": "TEST_DATASET/", + }, + ], + } + requests_mock.get(url, json=expected_data) + + # Mock empty product datasets response - this should trigger the APIResponseError + url2 = f"{fusion_obj.root_url}catalogs/{catalog}/productDatasets" + empty_data: dict[str, list[dict[str, str]]] = {"resources": []} + requests_mock.get(url2, json=empty_data) + + # This should work without raising an exception and product should be None + my_dataset = Dataset(identifier="TEST_DATASET").from_catalog(client=fusion_obj, catalog=catalog) + assert isinstance(my_dataset, Dataset) + assert my_dataset.title == "Test Dataset" + assert my_dataset.identifier == "TEST_DATASET" + assert my_dataset.product is None + + def test_create_dataset_from_dict(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: """Test create Dataset method.""" catalog = "my_catalog" @@ -1270,7 +1340,7 @@ def test_dataset_case_switching() -> None: "publisher": "J.P. Morgan", "product": ["TEST_PRODUCT"], "subCategory": ["subCategory"], - "tags": ["tag1", "tag2"], + "tag": ["tag1", "tag2"], "createdDate": "2020-05-05", "modifiedDate": "2020-05-05", "deliveryChannel": ["API"], diff --git a/py_tests/test_fusion.py b/py_tests/test_fusion.py index a1706393..f6d35ba8 100644 --- a/py_tests/test_fusion.py +++ b/py_tests/test_fusion.py @@ -4,7 +4,7 @@ import re from io import BytesIO from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any from unittest.mock import MagicMock, patch import pandas as pd @@ -19,9 +19,12 @@ from fusion.exceptions import APIResponseError, FileFormatError from fusion.fusion import Fusion, logger from fusion.fusion_types import Types -from fusion.report import Report from fusion.utils import _normalise_dt_param, distribution_to_url +if TYPE_CHECKING: + from fusion.report import Report + + def test__get_canonical_root_url() -> None: from fusion.utils import _get_canonical_root_url @@ -1359,17 +1362,17 @@ def test_to_bytes_multiple_files(requests_mock: requests_mock.Mocker, fusion_obj "fileExtension": ".parquet", "identifier": "file1", "title": "File 1", - "@id": "file1" + "@id": "file1", }, { "description": "Sample file 2", "fileExtension": ".parquet", "identifier": "file2", "title": "File 2", - "@id": "file2" - } + "@id": "file2", + }, ] - } + } distri_files_url = ( f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/" @@ -1378,22 +1381,22 @@ def test_to_bytes_multiple_files(requests_mock: requests_mock.Mocker, fusion_obj requests_mock.get(distri_files_url, json=mock_data) file1_url = distribution_to_url( - fusion_obj.root_url, - dataset, - datasetseries, - file_format, - catalog, - is_download=True, - file_name="file1", + fusion_obj.root_url, + dataset, + datasetseries, + file_format, + catalog, + is_download=True, + file_name="file1", ) file2_url = distribution_to_url( - fusion_obj.root_url, - dataset, - datasetseries, - file_format, - catalog, - is_download=True, - file_name="file2", + fusion_obj.root_url, + dataset, + datasetseries, + file_format, + catalog, + is_download=True, + file_name="file2", ) expected_data = b"some binary data" requests_mock.get(file1_url, content=expected_data) @@ -1407,6 +1410,7 @@ def test_to_bytes_multiple_files(requests_mock: requests_mock.Mocker, fusion_obj for d in data: assert d.getvalue() == expected_data + def test_to_bytes_single_file(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: catalog = "my_catalog" dataset = "my_dataset" @@ -1419,10 +1423,10 @@ def test_to_bytes_single_file(requests_mock: requests_mock.Mocker, fusion_obj: F "fileExtension": ".parquet", "identifier": "file1", "title": "File 1", - "@id": "file1" + "@id": "file1", } ] - } + } distri_files_url = ( f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/" @@ -1431,13 +1435,13 @@ def test_to_bytes_single_file(requests_mock: requests_mock.Mocker, fusion_obj: F requests_mock.get(distri_files_url, json=mock_data) file1_url = distribution_to_url( - fusion_obj.root_url, - dataset, - datasetseries, - file_format, - catalog, - is_download=True, - file_name="file1", + fusion_obj.root_url, + dataset, + datasetseries, + file_format, + catalog, + is_download=True, + file_name="file1", ) expected_data = b"some binary data" requests_mock.get(file1_url, content=expected_data) @@ -1448,19 +1452,14 @@ def test_to_bytes_single_file(requests_mock: requests_mock.Mocker, fusion_obj: F if isinstance(data, BytesIO): assert data.getbuffer() == expected_data + def test_to_bytes_with_filename(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: catalog = "my_catalog" dataset = "my_dataset" datasetseries = "2020-04-04" - file_format = "csv" + file_format = "csv" file1_url = distribution_to_url( - fusion_obj.root_url, - dataset, - datasetseries, - file_format, - catalog, - is_download=True, - file_name="file1" + fusion_obj.root_url, dataset, datasetseries, file_format, catalog, is_download=True, file_name="file1" ) expected_data = b"some binary data" requests_mock.get(file1_url, content=expected_data) @@ -1471,6 +1470,7 @@ def test_to_bytes_with_filename(requests_mock: requests_mock.Mocker, fusion_obj: if isinstance(data, BytesIO): assert data.getbuffer() == expected_data + @pytest.mark.skip(reason="MUST FIX") def test_download_main(mocker: MockerFixture, fusion_obj: Fusion) -> None: catalog = "my_catalog" @@ -3007,6 +3007,7 @@ def test_list_registered_attributes_paginated_fail(requests_mock: requests_mock. with pytest.raises(requests.exceptions.HTTPError): fusion_obj.list_registered_attributes(catalog=catalog) + def test_fusion_report(fusion_obj: Fusion) -> None: """Test Fusion Report object creation using required and optional arguments.""" report = fusion_obj.report( @@ -3015,25 +3016,18 @@ def test_fusion_report(fusion_obj: Fusion) -> None: frequency="Quarterly", category="Risk Management", sub_category="Operational Risk", - data_node_id={"name": "ComplianceTable", "dataNodeType": "Table"}, + owner_node={"name": "ComplianceTable", "type": "User Tool"}, + publisher_node={"name": "ComplianceDash", "type": "Intelligent Solutions"}, + business_domain="Risk", regulatory_related=True, - domain={"name": "Risk"}, - tier_type="Tier 1", lob="Global Markets", is_bcbs239_program=True, sap_code="SAP123", region="EMEA", ) + assert report.regulatory_related is True + assert report.business_domain == "Risk" - assert isinstance(report, Report) - assert report.title == "Quarterly Risk Report" - assert report.description == "Q1 Risk report for compliance" - assert report.client == fusion_obj - assert report.domain == {"name": "Risk"} - assert report.tier_type == "Tier 1" - assert report.is_bcbs239_program is True - assert report.region == "EMEA" - assert report.data_node_id["name"] == "ComplianceTable" def test_list_indexes_summary(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: @@ -3416,10 +3410,9 @@ def test_get_new_root_url_strip_version(fusion_obj: Fusion) -> None: fusion_obj.root_url = "https://fusion.jpmorgan.com/api/v1/" assert fusion_obj._get_new_root_url() == "https://fusion.jpmorgan.com" - def test_list_reports_all(fusion_obj: Fusion, requests_mock: requests_mock.Mocker) -> None: url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/list" - mock_data = {"content": [{"id": "rep1", "name": "Test Report", "category": "Finance", "subCategory": "Equities"}]} + mock_data = {"content": [{"id": "rep1", "title": "Test Report", "category": "Finance", "subCategory": "Equities"}]} requests_mock.post(url, json=mock_data) df = fusion_obj.list_reports() # noqa assert isinstance(df, pd.DataFrame) @@ -3430,13 +3423,14 @@ def test_list_reports_all(fusion_obj: Fusion, requests_mock: requests_mock.Mocke def test_list_reports_by_id(fusion_obj: Fusion, requests_mock: requests_mock.Mocker) -> None: report_id = "rep1" url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}" - mock_data = {"id": "rep1", "name": "Test Report", "category": "Finance", "subCategory": "Equities"} + mock_data = {"id": "rep1", "title": "Test Report", "category": "Finance", "subCategory": "Equities"} requests_mock.get(url, json=mock_data) df = fusion_obj.list_reports(report_id=report_id) # noqa assert isinstance(df, pd.DataFrame) assert df.iloc[0]["id"] == "rep1" + def test_list_report_attributes(fusion_obj: Fusion, requests_mock: requests_mock.Mocker) -> None: report_id = "rep1" url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/reportElements" @@ -3458,7 +3452,6 @@ def test_list_report_attributes(fusion_obj: Fusion, requests_mock: requests_mock assert "id" in df.columns assert df.iloc[0]["id"] == "attr1" - def test_fusion_report_required_only(fusion_obj: Fusion) -> None: report = fusion_obj.report( title="Test Report", @@ -3466,13 +3459,13 @@ def test_fusion_report_required_only(fusion_obj: Fusion) -> None: frequency="Monthly", category="Finance", sub_category="Market", - data_node_id={"name": "Node1", "dataNodeType": "Table"}, + owner_node={"name": "Node1", "type": "User Tool"}, + publisher_node={"name": "Dash-A", "type": "Intelligent Solutions"}, + business_domain="Risk", regulatory_related=True, - domain={"name": "Risk"}, ) - assert isinstance(report, Report) assert report.title == "Test Report" - assert report.client is fusion_obj + assert report.business_domain == "Risk" def test_fusion_report_with_optional_fields(fusion_obj: Fusion) -> None: @@ -3482,12 +3475,15 @@ def test_fusion_report_with_optional_fields(fusion_obj: Fusion) -> None: frequency="Quarterly", category="Credit", sub_category="Wholesale", - data_node_id={"name": "NodeX", "dataNodeType": "View"}, + owner_node={"name": "NodeX", "type": "User Tool"}, + publisher_node={ + "name": "DashX", + "type": "Intelligent Solutions", + "publisher_node_identifier": "pub-001", + }, + business_domain="Ops", regulatory_related=False, - domain={"name": "Ops"}, - tier_type="Tier 1", lob="Banking", - alternative_id={"system": "ABC"}, sub_lob="Retail", is_bcbs239_program=True, risk_area="Liquidity", @@ -3495,8 +3491,13 @@ def test_fusion_report_with_optional_fields(fusion_obj: Fusion) -> None: sap_code="SAP001", region="EMEA", ) - assert report.lob == "Banking" - assert report.client is fusion_obj + assert report.business_domain == "Ops" + assert report.owner_node is not None + assert report.owner_node["name"] == "NodeX" + assert report.publisher_node is not None + assert report.publisher_node["publisher_node_identifier"] == "pub-001" + + @patch("fusion.report.Report.link_attributes_to_terms") @@ -3528,6 +3529,7 @@ def test_link_attributes_to_terms_response_passthrough(mock_link: MagicMock, fus assert resp is mock_resp + def test_list_distribution_files_with_max_results(fusion_obj: Fusion, requests_mock: requests_mock.Mocker) -> None: mock_data = { "resources": [ @@ -3536,15 +3538,15 @@ def test_list_distribution_files_with_max_results(fusion_obj: Fusion, requests_m "fileExtension": ".parquet", "identifier": "file1", "title": "File 1", - "@id": "file1" + "@id": "file1", }, { "description": "Sample file 2", "fileExtension": ".parquet", "identifier": "file2", "title": "File 2", - "@id": "file2" - } + "@id": "file2", + }, ] } @@ -3562,11 +3564,7 @@ def test_list_distribution_files_with_max_results(fusion_obj: Fusion, requests_m # Case 1: Default (max_results = -1) → return all rows df_all = fusion_obj.list_distribution_files( - dataset=dataset, - series=series, - file_format=file_format, - catalog=catalog, - max_results=-1 + dataset=dataset, series=series, file_format=file_format, catalog=catalog, max_results=-1 ) assert isinstance(df_all, pd.DataFrame) TOTAL_FILES = 2 @@ -3575,13 +3573,9 @@ def test_list_distribution_files_with_max_results(fusion_obj: Fusion, requests_m assert df_all["@id"].tolist() == ["file1", "file2"] # Case 2: Limit to max_results = 1 - MAX_RESULTS=1 + MAX_RESULTS = 1 df_limited = fusion_obj.list_distribution_files( - dataset=dataset, - series=series, - file_format=file_format, - catalog=catalog, - max_results=MAX_RESULTS + dataset=dataset, series=series, file_format=file_format, catalog=catalog, max_results=MAX_RESULTS ) assert len(df_limited) == MAX_RESULTS @@ -3589,11 +3583,7 @@ def test_list_distribution_files_with_max_results(fusion_obj: Fusion, requests_m # Case 3: Limit higher than available rows → returns all df_over_limit = fusion_obj.list_distribution_files( - dataset=dataset, - series=series, - file_format=file_format, - catalog=catalog, - max_results=10 + dataset=dataset, series=series, file_format=file_format, catalog=catalog, max_results=10 ) assert len(df_over_limit) == TOTAL_FILES assert df_over_limit["@id"].tolist() == ["file1", "file2"] @@ -3610,8 +3600,8 @@ def test_fusion_dataflow_id_only(fusion_obj: Fusion) -> None: def test_fusion_dataflow_full(fusion_obj: Fusion) -> None: """Full constructor path: provider/consumer plus optional fields.""" - provider = {"name": "CRM_DB", "nodeType": "Database"} - consumer = {"name": "DWH", "nodeType": "Database"} + provider = {"name": "CRM_DB", "type": "Database"} + consumer = {"name": "DWH", "type": "Database"} flow = fusion_obj.dataflow( provider_node=provider, consumer_node=consumer, @@ -3636,8 +3626,8 @@ def test_list_dataflows_success(requests_mock: requests_mock.Mocker, fusion_obj: server_json = { "id": flow_id, "description": "sample flow", - "providerNode": {"name": "A", "nodeType": "DB"}, - "consumerNode": {"name": "B", "nodeType": "DB"}, + "providerNode": {"name": "A", "type": "DB"}, + "consumerNode": {"name": "B", "type": "DB"}, "frequency": "Daily", } requests_mock.get(url, json=server_json, status_code=200) @@ -3662,4 +3652,4 @@ def test_list_dataflows_http_error(requests_mock: requests_mock.Mocker, fusion_o requests_mock.get(url, status_code=404) with pytest.raises(requests.exceptions.HTTPError): - fusion_obj.list_dataflows(flow_id) + fusion_obj.list_dataflows(flow_id) \ No newline at end of file diff --git a/py_tests/test_report.py b/py_tests/test_report.py index d5e94af2..62dda9a9 100644 --- a/py_tests/test_report.py +++ b/py_tests/test_report.py @@ -1,6 +1,7 @@ -"""Test file for updated report.py and reports integration""" +"""Test file for report.py and reports integration""" from pathlib import Path +from typing import Any, Optional, cast import pytest @@ -11,46 +12,57 @@ def test_report_basic_fields() -> None: report = Report( title="Quarterly Report", - data_node_id={"name": "Node1", "dataNodeType": "User Tool"}, + owner_node={"name": "Node1", "type": "User Tool"}, + publisher_node={"name": "Dash1", "type": "Intelligent Solutions"}, description="Quarterly risk analysis", frequency="Quarterly", category="Risk", sub_category="Ops", - domain={"name": "CDO"}, + business_domain="CDO", regulatory_related=True, ) assert report.title == "Quarterly Report" - assert report.data_node_id["name"] == "Node1" - assert report.domain["name"] == "CDO" + assert report.owner_node is not None + assert report.owner_node["name"] == "Node1" + assert report.business_domain == "CDO" def test_report_to_dict() -> None: report = Report( title="Sample Report", - data_node_id={"name": "X", "dataNodeType": "Y"}, + owner_node={"name": "X", "type": "User Tool"}, + publisher_node={ + "name": "Y", + "type": "Intelligent Solutions", + "publisher_node_identifier": "seal:app:1", + }, description="Some desc", frequency="Monthly", category="Cat", sub_category="Sub", - domain={"name": "CDO"}, + business_domain="CDO", regulatory_related=False, ) result = report.to_dict() assert result["title"] == "Sample Report" - assert result["domain"]["name"] == "CDO" + assert result["businessDomain"] == "CDO" assert result["regulatoryRelated"] is False + assert result["publisherNode"]["publisherNodeIdentifier"] == "seal:app:1" + assert result["ownerNode"]["name"] == "X" + assert result["publisherNode"]["name"] == "Y" def test_report_validation_raises() -> None: report = Report( title="", - data_node_id={"name": "X", "dataNodeType": "Y"}, description="", frequency="", category="", sub_category="", - domain={"name": ""}, + business_domain="", regulatory_related=True, + owner_node=None, + publisher_node=None, ) with pytest.raises(ValueError, match="Missing required fields"): report.validate() @@ -58,26 +70,35 @@ def test_report_validation_raises() -> None: def test_report_from_dict() -> None: data = { - "Title": "Dict Report", - "DataNodeId": {"name": "X", "dataNodeType": "Y"}, - "Description": "Dict desc", - "Frequency": "Daily", - "Category": "Cat", - "SubCategory": "Sub", - "Domain": {"name": "CDO"}, - "RegulatoryRelated": True, + "title": "Dict Report", + "description": "Dict desc", + "frequency": "Daily", + "category": "Cat", + "subCategory": "Sub", + "businessDomain": "CDO", + "regulatoryRelated": True, + "ownerNode": {"name": "OWN", "type": "User Tool"}, + "publisherNode": { + "name": "PUB", + "type": "Intelligent Solutions", + "publisherNodeIdentifier": "pid-1", + }, } report = Report.from_dict(data) assert isinstance(report, Report) assert report.title == "Dict Report" assert report.frequency == "Daily" + assert report.publisher_node is not None + assert report.publisher_node["publisher_node_identifier"] == "pid-1" def test_reports_wrapper_from_csv(tmp_path: Path, fusion_obj: Fusion) -> None: csv_data = ( "Report/Process Name,Report/Process Description,Frequency,Category," - "Sub Category,CDO Office,Application ID,Application Type,Regulatory Designated\n" - "TestReport,Test description,Monthly,Risk,Ops,CDO,App1,User Tool,Yes" + "Sub Category,businessDomain,ownerNode_name,ownerNode_type," + "publisherNode_name,publisherNode_type,publisherNode_publisherNodeIdentifier,Regulatory Designated\n" + "TestReport,Test description,Monthly,Risk,Ops,CDO,App1,User Tool," + "Dash1,Intelligent Solutions,pub-123,Yes" ) file_path = tmp_path / "test_report.csv" file_path.write_text(csv_data) @@ -87,6 +108,11 @@ def test_reports_wrapper_from_csv(tmp_path: Path, fusion_obj: Fusion) -> None: assert isinstance(reports, Reports) assert len(reports) == 1 assert reports[0].title == "TestReport" + assert reports[0].owner_node is not None + assert reports[0].owner_node["name"] == "App1" + assert reports[0].publisher_node is not None + assert reports[0].publisher_node["name"] == "Dash1" + assert reports[0].publisher_node["publisher_node_identifier"] == "pub-123" def test_reports_wrapper_from_object_dicts(fusion_obj: Fusion) -> None: @@ -97,9 +123,12 @@ def test_reports_wrapper_from_object_dicts(fusion_obj: Fusion) -> None: "Frequency": "Monthly", "Category": "Finance", "Sub Category": "Analysis", - "CDO Office": "CDO", - "Application ID": "AppID", - "Application Type": "User Tool", + "businessDomain": "CDO", + "ownerNode_name": "AppID", + "ownerNode_type": "User Tool", + "publisherNode_name": "DashX", + "publisherNode_type": "Intelligent Solutions", + "publisherNode_publisherNodeIdentifier": "pid-99", "Regulatory Designated": "No", } ] @@ -108,3 +137,167 @@ def test_reports_wrapper_from_object_dicts(fusion_obj: Fusion) -> None: assert isinstance(reports, Reports) assert reports[0].title == "ObjReport" assert reports[0].regulatory_related is False + assert reports[0].owner_node is not None + assert reports[0].owner_node["name"] == "AppID" + assert reports[0].publisher_node is not None + assert reports[0].publisher_node["publisher_node_identifier"] == "pid-99" + + +def test_report_update_fields_excludes_id_and_uses_path() -> None: + """PATCH should not send 'id' in body and must use /reports/{id} path.""" + report = Report( + id="r-1", + title="t", + description="d", + frequency="f", + category="c", + sub_category="s", + business_domain="bd", + regulatory_related=True, + owner_node={"name": "own", "type": "User Tool"}, + publisher_node={"name": "pub", "type": "Intelligent Solutions"}, + ) + + class _Resp: + status_code = 200 + ok = True + text = "" + content = b"" + + def raise_for_status(self) -> None: + return None + + class _Sess: + def __init__(self) -> None: + self.last_url: Optional[str] = None + self.last: Optional[dict[str, Any]] = None + + def patch(self, url: str, json: dict[str, Any]) -> _Resp: + self.last_url = url + self.last = json + return _Resp() + + class _Fusion: + def __init__(self) -> None: + self.session = _Sess() + + def _get_new_root_url(self) -> str: + return "http://unit.test" + + client = _Fusion() + report.client = cast(Fusion, client) + + report.update_fields() + + assert client.session.last is not None + assert "id" not in client.session.last # must not be in PATCH body + assert client.session.last_url is not None + assert client.session.last_url.endswith("/api/corelineage-service/v1/reports/r-1") + + +def test_report_create_excludes_id_and_sets_id() -> None: + """CREATE should not send 'id' and should set self.id from response JSON.""" + report = Report( + id="pre-set", + title="t", + description="d", + frequency="f", + category="c", + sub_category="s", + business_domain="bd", + regulatory_related=True, + owner_node={"name": "own", "type": "User Tool"}, + publisher_node={"name": "pub", "type": "Intelligent Solutions"}, + ) + + class _Resp: + status_code = 200 + ok = True + text = "" + content = b"" + + def raise_for_status(self) -> None: + return None + + def json(self) -> dict[str, Any]: + return {"id": "new-123"} + + class _Sess: + def __init__(self) -> None: + self.last_url: Optional[str] = None + self.last: Optional[dict[str, Any]] = None + + def post(self, url: str, json: dict[str, Any]) -> _Resp: + self.last_url = url + self.last = json + return _Resp() + + class _Fusion: + def __init__(self) -> None: + self.session = _Sess() + + def _get_new_root_url(self) -> str: + return "http://unit.test" + + client = _Fusion() + report.client = cast(Fusion, client) + + report.create() + + assert client.session.last is not None + assert "id" not in client.session.last # id removed from POST body + assert client.session.last_url is not None + assert client.session.last_url.endswith("/api/corelineage-service/v1/reports") + assert report.id == "new-123" + + +def test_report_update_excludes_id_in_body_and_uses_path() -> None: + """UPDATE should not send 'id' in body and must use /reports/{id} path.""" + report = Report( + id="abc-999", + title="t", + description="d", + frequency="f", + category="c", + sub_category="s", + business_domain="bd", + regulatory_related=False, + owner_node={"name": "own", "type": "User Tool"}, + publisher_node={"name": "pub", "type": "Intelligent Solutions"}, + ) + + class _Resp: + status_code = 200 + ok = True + text = "" + content = b"" + + def raise_for_status(self) -> None: + return None + + class _Sess: + def __init__(self) -> None: + self.last_url: Optional[str] = None + self.last: Optional[dict[str, Any]] = None + + def put(self, url: str, json: dict[str, Any]) -> _Resp: + self.last_url = url + self.last = json + return _Resp() + + class _Fusion: + def __init__(self) -> None: + self.session = _Sess() + + def _get_new_root_url(self) -> str: + return "http://unit.test" + + client = _Fusion() + report.client = cast(Fusion, client) + + report.update() + + assert client.session.last is not None + assert "id" not in client.session.last # id removed from PUT body + assert client.session.last_url is not None + assert client.session.last_url.endswith("/api/corelineage-service/v1/reports/abc-999") diff --git a/py_tests/test_report_attributes.py b/py_tests/test_report_attributes.py index eaf747ae..efd54c99 100644 --- a/py_tests/test_report_attributes.py +++ b/py_tests/test_report_attributes.py @@ -10,7 +10,7 @@ def test_report_attribute_str_repr() -> None: - attr = ReportAttribute(title="Revenue", sourceIdentifier="rev-001") + attr = ReportAttribute(title="Revenue", source_identifier="rev-001") assert isinstance(str(attr), str) assert isinstance(repr(attr), str) @@ -18,12 +18,13 @@ def test_report_attribute_str_repr() -> None: def test_report_attribute_to_dict() -> None: attr = ReportAttribute( title="Revenue", - sourceIdentifier="rev-001", + source_identifier="rev-001", description="Total revenue", - technicalDataType="decimal", + technical_data_type="decimal", path="finance/metrics", ) expected = { + "id": None, "sourceIdentifier": "rev-001", "title": "Revenue", "description": "Total revenue", @@ -34,13 +35,13 @@ def test_report_attribute_to_dict() -> None: def test_report_attribute_client_get_set(fusion_obj: Fusion) -> None: - attr = ReportAttribute(title="Revenue", sourceIdentifier="rev-001") + attr = ReportAttribute(title="Revenue", source_identifier="rev-001") attr.client = fusion_obj assert attr.client == fusion_obj def test_report_attributes_add_get_remove() -> None: - attr = ReportAttribute(title="Revenue", sourceIdentifier="rev-001") + attr = ReportAttribute(title="Revenue", source_identifier="rev-001") attrs = ReportAttributes() attrs.add_attribute(attr) assert attrs.attributes[0].title == "Revenue" @@ -57,10 +58,20 @@ def test_report_attributes_to_and_from_dict_list() -> None: "path": "finance/metrics", } ] + expected_data = [ + { + "id": None, + "title": "Revenue", + "sourceIdentifier": "rev-001", + "description": "Total revenue", + "technicalDataType": "decimal", + "path": "finance/metrics", + } + ] attrs_instance = ReportAttributes() attrs = attrs_instance.from_dict_list(data) assert isinstance(attrs, ReportAttributes) - assert attrs.to_dict() == {"attributes": data} + assert attrs.to_dict() == {"attributes": expected_data} def test_report_attributes_from_dataframe() -> None: @@ -140,7 +151,7 @@ def test_report_attributes_from_object_invalid_string() -> None: def test_report_attributes_to_dataframe() -> None: - attr = ReportAttribute(title="Revenue", sourceIdentifier="rev-001") + attr = ReportAttribute(title="Revenue", source_identifier="rev-001") test_df = ReportAttributes([attr]).to_dataframe() assert test_df.shape[0] == 1 assert test_df["title"].iloc[0] == "Revenue" @@ -154,10 +165,11 @@ def test_report_attributes_use_client_value_error() -> None: def test_report_attributes_create(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: HTTP_OK = 200 report_id = "report_123" - url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/reportElements" + url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/report-elements" expected_payload = [ { + "id": None, "sourceIdentifier": "rev-001", "title": "Revenue", "description": None, @@ -167,10 +179,173 @@ def test_report_attributes_create(requests_mock: requests_mock.Mocker, fusion_ob ] requests_mock.post(url, json=expected_payload, status_code=HTTP_OK) - attr = ReportAttribute(title="Revenue", sourceIdentifier="rev-001") + attr = ReportAttribute(title="Revenue", source_identifier="rev-001") attrs = ReportAttributes(attributes=[attr]) attrs.client = fusion_obj response = attrs.create(report_id=report_id, return_resp_obj=True) assert isinstance(response, requests.Response) assert response.status_code == HTTP_OK + + +def test_report_attributes_update(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + """Test the update method (PUT operation).""" + HTTP_OK = 200 + report_id = "report_123" + url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/report-elements" + + expected_payload = [ + { + "id": 456, + "sourceIdentifier": "rev-001", + "description": "Updated revenue field", + "technicalDataType": "decimal", + "path": "/data/revenue", + } + ] + requests_mock.put(url, json=expected_payload, status_code=HTTP_OK) + + attr = ReportAttribute( + id=456, + title="Revenue", + source_identifier="rev-001", + description="Updated revenue field", + technical_data_type="decimal", + path="/data/revenue", + ) + attrs = ReportAttributes(attributes=[attr]) + attrs.client = fusion_obj + + response = attrs.update(report_id=report_id, return_resp_obj=True) + assert isinstance(response, requests.Response) + assert response.status_code == HTTP_OK + + # Verify the request was made with correct data + assert requests_mock.last_request is not None + assert requests_mock.last_request.json() == expected_payload + + +def test_report_attributes_update_without_id_fails(fusion_obj: Fusion) -> None: + """Test that update method fails when attribute doesn't have id.""" + attr = ReportAttribute(title="Revenue", source_identifier="rev-001") + attrs = ReportAttributes(attributes=[attr]) + attrs.client = fusion_obj + + with pytest.raises(ValueError, match="must have an 'id' field for update"): + attrs.update(report_id="report_123") + + +def test_report_attributes_update_fields(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + """Test the update_fields method (PATCH operation).""" + HTTP_OK = 200 + report_id = "report_123" + url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/report-elements" + + expected_payload = [ + { + "id": 456, + "description": "Updated revenue field", + "technicalDataType": "decimal", + } + ] + requests_mock.patch(url, json=expected_payload, status_code=HTTP_OK) + + # Create attribute with only id and fields to update (others are None) + attr = ReportAttribute( + id=456, + title="Revenue", # title is immutable, won't be in PATCH payload + description="Updated revenue field", + technical_data_type="decimal", + # sourceIdentifier and path are None, so they are excluded from payload + ) + attrs = ReportAttributes(attributes=[attr]) + attrs.client = fusion_obj + + response = attrs.update_fields(report_id=report_id, return_resp_obj=True) + assert isinstance(response, requests.Response) + assert response.status_code == HTTP_OK + + # Verify the request was made with correct data (only non-None updatable fields) + assert requests_mock.last_request is not None + assert requests_mock.last_request.json() == expected_payload + + +def test_report_attributes_update_fields_without_id_fails(fusion_obj: Fusion) -> None: + """Test that update_fields method fails when attribute doesn't have id.""" + attr = ReportAttribute(title="Revenue", description="Some description") + attrs = ReportAttributes(attributes=[attr]) + attrs.client = fusion_obj + + with pytest.raises(ValueError, match="must have an 'id' field for update_fields"): + attrs.update_fields(report_id="report_123") + + +def test_report_attributes_delete(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + """Test the delete method (DELETE operation).""" + HTTP_OK = 200 + report_id = "report_123" + url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/report-elements" + + expected_payload = [{"id": 456}] + requests_mock.delete(url, json=expected_payload, status_code=HTTP_OK) + + attr = ReportAttribute(id=456, title="Revenue") # Only id is needed for deletion + attrs = ReportAttributes(attributes=[attr]) + attrs.client = fusion_obj + + response = attrs.delete(report_id=report_id, return_resp_obj=True) + assert isinstance(response, requests.Response) + assert response.status_code == HTTP_OK + + # Verify the request was made with correct data (only id) + assert requests_mock.last_request is not None + assert requests_mock.last_request.json() == expected_payload + + +def test_report_attributes_delete_multiple(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + """Test the delete method with multiple attributes.""" + HTTP_OK = 200 + report_id = "report_123" + url = f"{fusion_obj._get_new_root_url()}/api/corelineage-service/v1/reports/{report_id}/report-elements" + + expected_payload = [{"id": 456}, {"id": 789}] + requests_mock.delete(url, json=expected_payload, status_code=HTTP_OK) + + attrs = ReportAttributes( + attributes=[ReportAttribute(id=456, title="Revenue"), ReportAttribute(id=789, title="Profit")] + ) + attrs.client = fusion_obj + + response = attrs.delete(report_id=report_id, return_resp_obj=True) + assert isinstance(response, requests.Response) + assert response.status_code == HTTP_OK + + # Verify the request was made with correct data + assert requests_mock.last_request is not None + assert requests_mock.last_request.json() == expected_payload + + +def test_report_attributes_delete_without_id_fails(fusion_obj: Fusion) -> None: + """Test that delete method fails when attribute doesn't have id.""" + attr = ReportAttribute(title="Revenue", source_identifier="rev-001") + attrs = ReportAttributes(attributes=[attr]) + attrs.client = fusion_obj + + with pytest.raises(ValueError, match="must have an 'id' field for deletion"): + attrs.delete(report_id="report_123") + + +def test_report_attributes_methods_without_client_fails() -> None: + """Test that all methods fail when no client is provided.""" + attr = ReportAttribute(id=456, title="Revenue") + attrs = ReportAttributes(attributes=[attr]) + # No client set + + with pytest.raises(ValueError, match="A Fusion client object is required"): + attrs.update(report_id="report_123") + + with pytest.raises(ValueError, match="A Fusion client object is required"): + attrs.update_fields(report_id="report_123") + + with pytest.raises(ValueError, match="A Fusion client object is required"): + attrs.delete(report_id="report_123") diff --git a/pyproject.toml b/pyproject.toml index 9e91541c..43a5b3ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "pyfusion" -version = "3.0.1" +version = "3.0.2-dev0" description = "JPMC Fusion Developer Tools" authors = [ @@ -31,6 +31,7 @@ dependencies = [ "pyarrow >= 11", "fsspec >= 2021", "aiohttp >= 3.7.1", + "tqdm >= 4.48.0", "rich >= 11.0.0", "certifi", "pyjwt>=2.10.1", @@ -246,7 +247,7 @@ exclude_lines = [ [tool.bumpversion] -current_version = "3.0.1" +current_version = "3.0.2-dev0" parse = '(?P\d+)\.(?P\d+)\.(?P\d+)(?:-(?P[a-z]+)(?P\d+))?' serialize = [ '{major}.{minor}.{patch}-{release}{candidate}', diff --git a/uv.lock b/uv.lock index a186ca56..432411e2 100644 --- a/uv.lock +++ b/uv.lock @@ -3221,6 +3221,7 @@ dependencies = [ { name = "requests" }, { name = "rich" }, { name = "tabulate" }, + { name = "tqdm" }, ] [package.optional-dependencies] @@ -3355,6 +3356,7 @@ requires-dist = [ { name = "s3fs", marker = "extra == 'aws'" }, { name = "sseclient", marker = "extra == 'events'" }, { name = "tabulate", specifier = ">=0.8" }, + { name = "tqdm", specifier = ">=4.48.0" }, ] provides-extras = ["all", "doc", "aws", "gcs", "azr", "polars", "events", "embeddings"] @@ -4501,6 +4503,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bf/c2/610b0bb0e91394fcd2166a04bc8699441d1d5e965957840788b9286d0bb7/tox_uv-1.20.2-py3-none-any.whl", hash = "sha256:f50f8e7b1f0f583061eb610e3c9cbeee18938e76e10c4ba837869d488bcbdb87", size = 14540, upload-time = "2025-01-28T18:10:20.785Z" }, ] +[[package]] +name = "tqdm" +version = "4.67.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a8/4b/29b4ef32e036bb34e4ab51796dd745cdba7ed47ad142a9f4a1eb8e0c744d/tqdm-4.67.1.tar.gz", hash = "sha256:f8aef9c52c08c13a65f30ea34f4e5aac3fd1a34959879d7e59e63027286627f2", size = 169737, upload-time = "2024-11-24T20:12:22.481Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl", hash = "sha256:26445eca388f82e72884e0d580d5464cd801a3ea01e63e5601bdff9ba6a48de2", size = 78540, upload-time = "2024-11-24T20:12:19.698Z" }, +] + [[package]] name = "traitlets" version = "5.14.3"