From b5b41db389178105e9c9508bf1f36196e55bc1ae Mon Sep 17 00:00:00 2001 From: Thingus Date: Fri, 1 Aug 2025 13:01:00 +0100 Subject: [PATCH] Volume mount impl in progress --- src/flowpytertask/flowpytertask.py | 28 +++++++++++++++++----------- tests/dags/example_dag.py | 3 +++ tests/test_flowpyter_operator.py | 9 +++++++++ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/flowpytertask/flowpytertask.py b/src/flowpytertask/flowpytertask.py index 8fea520..0992baa 100644 --- a/src/flowpytertask/flowpytertask.py +++ b/src/flowpytertask/flowpytertask.py @@ -199,37 +199,42 @@ def _reserved_param_check(nb_params, mount_params): f"'dagrun_data_dir' and 'shared_data_dir" ) + @staticmethod + def _str_to_mount(source: str, target: str, read_only: bool = False): + if source.startswith("volume:"): + source = source.replace("volume:", "", 1) + mount_type = "volume" + else: + mount_type = "bind" + return Mount(source=source, target=target, type=mount_type, read_only=read_only) + def _setup_mounts(self): mount_params = {} self.mounts = [ - Mount( + self._str_to_mount( source=str(self.host_notebook_dir), target=str(self.CONTAINER_NOTEBOOK_DIR), - type="bind", read_only=True, ), - Mount( + self._str_to_mount( source=str(self.host_notebook_out_dir), target=str(self.CONTAINER_NOTEBOOK_OUT_DIR), - type="bind", ), ] if self.host_dagrun_data_dir: self.mounts.append( - Mount( + self._str_to_mount( source=str(self.host_dagrun_data_dir), target=str(self.CONTAINER_DAGRUN_DATA_DIR), - type="bind", ) ) mount_params["dagrun_data_dir"] = str(self.CONTAINER_DAGRUN_DATA_DIR) if self.host_shared_data_dir: self.mounts.append( - Mount( + self._str_to_mount( source=str(self.host_shared_data_dir), target=str(self.CONTAINER_SHARED_DATA_DIR), - type="bind", ) ) mount_params["shared_data_dir"] = str(self.CONTAINER_SHARED_DATA_DIR) @@ -243,14 +248,15 @@ def _setup_mounts(self): ) container_path = f"/opt/airflow/{name}" self.mounts.append( - Mount( - source=host_path, target=container_path, type="bind", read_only=True + self._str_to_mount( + source=host_path, + target=container_path, ) ) mount_params[name] = container_path mount_string = "\n".join(f"{m['Source']} to {m['Target']}" for m in self.mounts) - self.log.info(f"Mounts:\n {mount_string}") + self.log.info(f"self._str_to_mounts:\n {mount_string}") return mount_params def _setup_notebook_paths( diff --git a/tests/dags/example_dag.py b/tests/dags/example_dag.py index d4b2078..f9c4e6d 100644 --- a/tests/dags/example_dag.py +++ b/tests/dags/example_dag.py @@ -29,5 +29,8 @@ nb_params={"artifact_in": "my_artifact.txt"}, task_id="read_task", ) + volume_mount_task = FlowpyterOperator( + notebook_name="test_nb.ipynb", task_id="test_task" + ) first_task glue_task >> read_task diff --git a/tests/test_flowpyter_operator.py b/tests/test_flowpyter_operator.py index 28d957b..9a4ac58 100644 --- a/tests/test_flowpyter_operator.py +++ b/tests/test_flowpyter_operator.py @@ -37,6 +37,15 @@ def default_folder_params(tmp_out_dir, tmp_dagrun_data_dir, tmp_shared_data_dir) "host_shared_data_dir": str(tmp_shared_data_dir), } +@pytest.fixture() +def volume_mount_folder_params(): + yield { + "host_notebook_dir":"volume:host_notebook_dir", + "host_dagrun_data_dir":"volume:host_dagrun_data_dir", + "host_notebook_out_dir":"volume:host_notebook_out_dir", + "host_shared_data_dir":"volume:host_shared_data_dir", + } + @pytest.fixture def base_dag(dag_setup, default_folder_params):