diff --git a/docs/guide/storage.rst b/docs/guide/storage.rst index 39413208..165b9640 100644 --- a/docs/guide/storage.rst +++ b/docs/guide/storage.rst @@ -1,5 +1,106 @@ The GUFE Storage System ======================= +Storage lifetimes +----------------- -Storage docs. +The storage system in GUFE is heavily tied to the GUFE protocol system. The +key concept here is that the different levels of the GUFE protocol system; +campaign, DAG, and unit; inherently imply different lifetimes for the data +that is created. Those different lifetimes define the stages of the GUFE +storage system. + +In an abstract sense, as used by protocol developers, these three levels +correspond to three lifetimes of data: + +* ``scratch``: This is temporary data that is only needed for the lifetime + of a :class:`.ProtocolUnit`. This data is not guaranteed to be available + beyond the single :class:`.ProtocolUnit` where it is created, but may be + reused within that :class:`.ProtocolUnit`. +* ``shared``: This is data that is shared between different units in a + :class:`.ProtocolDAG`. For example, a single equilibration stage might be + shared between multiple production runs. The output snapshot of the + equilibration would be suitable for as something to put in ``shared`` + data. This data is guaranteed to be present from when it is created until + the end of the :class:`.ProtocolDAG`, but is not guaranteed to exist after + the :class:`.ProtocolDAG` terminates. +* ``permanent``: This is the data that will be needed beyond the scope of a + single rough estimate of the calculation. This could include anything that + an extension of the simulation would require, or things that require + network-scale analysis. Anything stored here will be usable after the + calculation has completed. + +The ``scratch`` area is always a local directory. However, ``shared`` and +``permanent`` can be external (remote) resources, using the +:class:`.ExternalResource` API. + +As a practical matter, the GUFE storage system can be handled with a +:class:`.StorageManager`. This automates some aspects of the transfer +between stages of the GUFE storage system, and simplifies the API for +protocol authors. In detail, this provides protocol authors with +``PathLike`` objects for ``scratch``, ``shared``, and ``permanent``. All +three of these objects actually point to special subdirectories of the +local scratch space for a specific unit, but are managed by context +managers at the executor level, which handle the process of moving objects +from local staging directories to the actual ``shared`` and ``permanent`` +locations, which can be external resources. + + +External resource utilities +--------------------------- + +For flexible data storage, GUFE defines the :class:`.ExternalResource` API, +which allows data be stored/loaded in a way that is agnostic to the +underlying data store, as long as the store can be represented as a +key-value store. Withing GUFE, we provide several examples, including +:class:`.FileStorage` and :class:`.MemoryStorage` (primarily useful for +testing.) The specific ``shared`` and ``permanent`` resources, as provided +to the executor, can be instances of an :class:`.ExternalResource`. + +.. note:: + + The ``shared`` space must be a resource where an uploaded object is + instantaneously available, otherwise later protocol units may fail if the + shared result is unavailable. This means that files or approaches based + on ``scp`` or ``sftp`` are fine, but things like cloud storage, where the + existence of a new document may take time to propagate through the + network, are not recommended for ``shared``. + + +Details: Manangement of the Storage Lifetime +-------------------------------------------- + +The concepts of the storage lifetimes are important for protocol authors, +but details of implementation are left to the specific executor. In order to +facilitate correct treatment of the storage lifecycle, GUFE provides a few +helpers. The information in this section is mostly of interest to authors of +executors. The helpers are: + +* :class:`.StorageManager`: This is the overall façade interface for + interacting with the rest of the storage lifecycle tools. It provides two + methods to generate context managers; one for the :class:`.ProtocolDAG` + level of the lifecycle, and one for the :class:`.ProtocoUnit` level of the + lifecycle. This class is designed for the use case that the entire DAG is + run in serial within a single process. Subclasses of this can be created + for other execution architectures, where the main logic changes would be + in the methods that return those context managers. +* :class:`.StagingRegistry`: This handles the logic around staging paths + within a :class:`.ProtocolUnit`. Think of this as an abstract + representation of a local directory. Paths within it register with it, and + it handles deletion of the temporary local files when not needed, as well + as the download of remote files when necessary for reading. There are two + important subclasses of this: :class:`.SharedStaging` for a ``shared`` + resource, and :class:`.PermanentStaging` for a ``permanent`` resource. +* :class:`.StagingPath`: This represents a file within the + :class:`.StagingRegistry`. It contains both the key (label) used in the + key-value store, as well as the actual local path to the file. When its + ``__fspath__`` method is called, it registers itself with its + :class:`.StagingRegistry`, which handles managing it over its lifecycle. + +In practice, the executor uses the :class:`.StorageManager` to create a +:class:`.DAGContextManager` at the level of a DAG, and then uses the +:class:`.DAGContextManager` to create a context to run a unit. That context +creates a :class:`.SharedStaging` and a :class:`.PermanentStaging` +associated with the specific unit. Those staging directories, with the +scratch directory, are provided to the :class:`.ProtocolUnit`, so that +these are the only objects protocol authors need to interact with. diff --git a/gufe/storage/stagingregistry.py b/gufe/storage/stagingregistry.py new file mode 100644 index 00000000..40b311a9 --- /dev/null +++ b/gufe/storage/stagingregistry.py @@ -0,0 +1,360 @@ +from __future__ import annotations + +from typing import Union, Optional +from pathlib import Path +from os import PathLike, rmdir, remove +from .externalresource import ExternalStorage, FileStorage +from contextlib import contextmanager + +from gufe.utils import delete_empty_dirs + +import logging +_logger = logging.getLogger(__name__) + + +def _safe_to_delete_file( + external: ExternalStorage, + path: PathLike +) -> bool: + """Check that deleting this file will not remove it from external""" + # kind of brittle: deals with internals of FileStorage + if isinstance(external, FileStorage): + root = external.root_dir + else: + return True + + p = Path(path) + try: + label = str(p.relative_to(root)) + except ValueError: + return True + return not external.exists(label) + + +class StagingRegistry: + """Local representation of an :class:`.ExternalStorage`. + + This connects objects on a local filesystem to the key-value store of a + (possibly remote) :class:`.ExternalStorage`. It presents a FileLike + interface to users, but internally (via the :class:`.StagingPath` objects + it contains in its registry) maps local filenames to the keys (labels) + for the key-value store. + + 1. If a local path is requested that corresponds to an existing label in + the :class:`.ExternalStorage`, this object will "download" the + contents of that key to that local path. + + 2. When requested, it transfers any newly created files to the + :class:`.ExternalStorage`. + + 3. It can delete all of the files it manages. + + Parameters + ---------- + scratch : PathLike + the scratch directory shared by all objects on this host + external : :class:`.ExternalStorage` + external storage resource where objects should eventualy go + staging : PathLike + name of the subdirectory of scratch where staged results are + temporarily stored; default is '.staging'. This must be the same for + all units within a DAG. + delete_staging : bool + whether to delete the contents of the $SCRATCH/$STAGING + directory when this object is deleted + """ + def __init__( + self, + scratch: PathLike, + external: ExternalStorage, + *, + staging: PathLike = Path(".staging"), + delete_staging: bool = True, + keep_empty_dirs: bool = False, + ): + self.external = external + self.scratch = Path(scratch) + self.delete_staging = delete_staging + self.keep_empty_dirs = keep_empty_dirs + self.staging = staging + + self.registry: set[StagingPath] = set() + self.preexisting: set[StagingPath] = set() + self.staging_dir = self.scratch / staging + self.staging_dir.mkdir(exist_ok=True, parents=True) + + def _delete_file_safe(self, file): + """Check if deleting this file will remove it from external.""" + return _safe_to_delete_file( + external=self.external, + path=file + ) + + def transfer_single_file_to_external(self, held_file: StagingPath): + """Transfer a given file from staging into external storage + """ + path = held_file.as_path() + if not path.exists(): + _logger.info(f"Found nonexistent path {path}, not " + "transfering to external storage") + elif path.is_dir(): + _logger.debug(f"Found directory {path}, not " + "transfering to external storage") + else: + _logger.info(f"Transfering {path} to external storage") + self.external.store_path(held_file.label, path) + return held_file + + return None # no transfer + + def transfer_staging_to_external(self): + """Transfer all objects in the registry to external storage + + """ + return [ + transferred + for file in self.registry + if (transferred := self.transfer_single_file_to_external(file)) + ] + + def _delete_file(self, file: StagingPath): + path = file.as_path() + if path.exists(): + if not path.is_dir(): + _logger.debug(f"Removing file '{file}'") + path.unlink() + else: + _logger.debug( + f"During staging cleanup, the directory '{file}' was " + "found as a staged path. This will be deleted only if " + "`keep_empty` is False." + ) + self.registry.remove(file) + else: + _logger.warning( + f"During staging cleanup, file '{file}' was marked for " + "deletion, but can not be found on disk." + ) + + def cleanup(self): + """Perform end-of-lifecycle cleanup. + """ + if self.delete_staging: + for file in self.registry - self.preexisting: + if self._delete_file_safe(file): + self._delete_file(file) + + if not self.keep_empty_dirs: + delete_empty_dirs(self.staging_dir) + + def register_path(self, staging_path: StagingPath): + """ + Register a :class:`.StagingPath` with this :class:`.StagingRegistry`. + + This marks a given path as something for this object to manage, by + loading it into the ``registry``. This way it is tracked such that + its contents can be transfered to the :class:`.ExternalStorage` and + such that the local copy can be deleted when it is no longer needed. + + If this objects's :class:`.ExternalStorage` already has data for the + label associated with the provided :class:`.Stagingpath`, then the + contents of that should copied to the local path so that it can be + read by the user. + + Parameters + ---------- + staging_path: :class:`.StagingPath` + the path to track + """ + label_exists = self.external.exists(staging_path.label) + fspath = staging_path.as_path() + + # TODO: what if the staging path is a directory? not sure that we + # have a way to know that; but not sure that adding it to the + # registry is right either + if not fspath.parent.exists(): + fspath.parent.mkdir(parents=True, exist_ok=True) + + self.registry.add(staging_path) + + # if this is a file that exists, bring it into our subdir + # NB: this happens even if you're intending to overwrite the path, + # which is kind of wasteful + if label_exists: + self._load_file_from_external(self.external, staging_path) + + def _load_file_from_external(self, external: ExternalStorage, + staging_path: StagingPath): + # import pdb; pdb.set_trace() + scratch_path = self.staging_dir / staging_path.path + # TODO: switch this to using `get_filename` and `store_path` + if scratch_path.exists(): + self.preexisting.add(staging_path) + + with external.load_stream(staging_path.label) as f: + external_bytes = f.read() + ... # TODO: check that the bytes are the same if preexisting? + + scratch_path.parent.mkdir(exist_ok=True, parents=True) + with open(scratch_path, mode='wb') as f: + f.write(external_bytes) + + def __truediv__(self, path: Union[PathLike, str]): + return StagingPath(root=self, path=path) + + def __repr__(self): + return ( + f"{self.__class__.__name__}('{self.scratch}', {self.external})" + ) + + def __del__(self): # -no-cov- + # in case someone doesn't use this within a context manager + if self.staging_dir.exists(): + self.cleanup() + + +class SharedStaging(StagingRegistry): + """Staging for shared external storage. + + This enables read-only versions to be loaded from other units. + """ + def __init__( + self, + scratch: PathLike, + external: ExternalStorage, + *, + staging: PathLike = Path(".staging"), + delete_staging: bool = True, + keep_empty_dirs: bool = False, + read_only: bool = False, + ): + super().__init__(scratch, external, staging=staging, + delete_staging=delete_staging, + keep_empty_dirs=keep_empty_dirs) + self.read_only = read_only + + def transfer_single_file_to_external(self, held_file: StagingPath): + if self.read_only: + _logger.debug("Read-only: Not transfering to external storage") + return # early exit + + return super().transfer_single_file_to_external(held_file) + + def transfer_staging_to_external(self): + if self.read_only: + _logger.debug("Read-only: Not transfering to external storage") + return # early exit + + return super().transfer_staging_to_external() + + def register_path(self, staging_path: StagingPath): + label_exists = self.external.exists(staging_path.label) + + if self.read_only and not label_exists: + raise IOError(f"Unable to create '{staging_path.label}'. File " + "does not exist in external storage, and this " + "staging path is read-only.") + + super().register_path(staging_path) + + +class PermanentStaging(StagingRegistry): + """Staging directory for the permanent storage. + + This allows files to be downloaded from a shared + :class:`.ExternalStorage`. + """ + def __init__( + self, + scratch: PathLike, + external: ExternalStorage, + shared: ExternalStorage, + *, + staging: PathLike = Path(".staging"), + delete_staging: bool = True, + keep_empty_dirs: bool = False, + ): + super().__init__(scratch, external, staging=staging, + delete_staging=delete_staging, + keep_empty_dirs=keep_empty_dirs) + self.shared = shared + + def _delete_file_safe(self, file): + shared_safe = _safe_to_delete_file( + external=self.shared, + path=file + ) + return shared_safe and super()._delete_file_safe(file) + + def transfer_single_file_to_external(self, held_file: StagingPath): + # if we can't find it locally, we load it from shared storage + path = held_file.as_path() + if not path.exists(): + self._load_file_from_external(self.shared, held_file) + + super().transfer_single_file_to_external(held_file) + + +class StagingPath: + """PathLike object linking local path with label for external storage. + + On creation, this registers with a :class:`.StagingRegistry` that will + manage the local path and transferring data with its + :class:`.ExternalStorage`. + + This object can always be used as a FileLike (using, e.g., the standard + ``open`` builtin). This requires that a staged path that exists on an + external resource be downloaded into a local file when it is referenced. + + For a representation of a file that does not require the download (for + example, when deserializing results that point to files) instead use + :class:`.ExternalFile`. + """ + def __init__(self, root: StagingRegistry, + path: Union[PathLike, str]): + self.root = root + self.path = Path(path) + + def register(self): + """Register this path with its StagingRegistry. + + If a file associated with this path exists in an external storage, + it will be downloaded to the staging area as part of registration. + """ + self.root.register_path(self) + + def __truediv__(self, path: Union[PathLike, str]): + return StagingPath(self.root, self.path / path) + + def __eq__(self, other): + return (isinstance(other, StagingPath) + and self.root == other.root + and self.path == other.path) + + def __hash__(self): + return hash((self.root, self.path)) + + def as_path(self): + """Return the pathlib.Path where this is staged""" + return Path(self._fspath) + + @property + def _fspath(self): + return str(self.root.staging_dir / self.path) + + def __fspath__(self): + self.register() + return self._fspath + + @property + def label(self) -> str: + """Label used in :class:`.ExternalStorage` for this path""" + return str(self.path) + + def __repr__(self): + return f"StagingPath('{self._fspath}')" + + # TODO: how much of the pathlib.Path interface do we want to wrap? + # although edge cases may be a pain, we can get most of it with, e.g.: + # def exists(self): return Path(self).exists() + # but also, can do pathlib.Path(staging_path) and get hte whole thing diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py new file mode 100644 index 00000000..ac030cb8 --- /dev/null +++ b/gufe/storage/storagemanager.py @@ -0,0 +1,136 @@ +from __future__ import annotations +from os import PathLike +from pathlib import Path +from contextlib import contextmanager +import shutil + +from gufe.utils import delete_empty_dirs + +from typing import Type + +from .externalresource import ExternalStorage, FileStorage +from .stagingregistry import SharedStaging, PermanentStaging +from .stagingregistry import StagingPath # typing + + +class StorageManager: + def __init__( + self, + scratch_root: PathLike, + shared_root: ExternalStorage, + permanent_root: ExternalStorage, + *, + keep_scratch: bool = False, + keep_staging: bool = False, + keep_shared: bool = False, + keep_empty_dirs: bool = False, + staging: PathLike = Path(".staging"), + ): + self.scratch_root = Path(scratch_root) + self.shared_root = shared_root + self.permanent_root = permanent_root + self.keep_scratch = keep_scratch + self.keep_staging = keep_staging + self.keep_shared = keep_shared + self.staging = staging + self.keep_empty_dirs = keep_empty_dirs + + # these are used to track what files can be deleted from shared if + # keep_shared is False + self.shared_xfer: set[StagingPath] = set() + self.permanent_xfer: set[StagingPath] = set() + + self.permanent_staging = PermanentStaging( + scratch=self.scratch_root, + external=self.permanent_root, + shared=self.shared_root, + staging=self.staging, + keep_empty_dirs=keep_empty_dirs, + ) + + self.shared_staging = SharedStaging( + scratch=self.scratch_root, + external=self.shared_root, + staging=self.staging, + keep_empty_dirs=keep_empty_dirs, + ) + + def make_label(self, dag_label, unit_label, attempt, **kwargs): + """ + + The specific executor may change this by making a very simple + adapter subclass and overriding this method, which can take + arbitrary additional kwargs that may tie it to a specific executor. + """ + return f"{dag_label}/{unit_label}_attempt_{attempt}" + + @property + def _scratch_base(self): + return self.scratch_root / "scratch" + + def _scratch_loc(self, dag_label, unit_label, attempt, **kwargs): + label = self.make_label(dag_label, unit_label, attempt) + return self._scratch_base / label + + @contextmanager + def running_dag(self, dag_label): + # TODO: remove (or use) dag_label + try: + yield self + finally: + # import pdb; pdb.set_trace() + # clean up after DAG completes + self.permanent_staging.transfer_staging_to_external() + + if not self.keep_staging: + self.permanent_staging.cleanup() + + if not self.keep_shared: + # we'd like to do something like loop over + # self.shared_xfer - self.permanent_xfer; however, + # StagedPaths have different staging registries. This gives + # the set of paths we do want to delete + perm_xfer_paths = {p.as_path() for p in self.permanent_xfer} + shared_xfer_to_delete = { + p for p in self.shared_xfer + if p.as_path() not in perm_xfer_paths + } + + for file in shared_xfer_to_delete: + self.shared_root.delete(file.label) + + for file in self.permanent_xfer: + if self.shared_root != self.permanent_root: + self.shared_root.delete(file.label) + + if not self.keep_empty_dirs: + delete_empty_dirs(self._scratch_base, delete_root=False) + + @contextmanager + def running_unit(self, dag_label, unit_label, **kwargs): + scratch = self._scratch_loc(dag_label, unit_label, **kwargs) + label = self.make_label(dag_label, unit_label, **kwargs) + scratch.mkdir(parents=True, exist_ok=True) + shared = self.shared_staging / label + permanent = self.permanent_staging / label + try: + yield scratch, shared, permanent + finally: + # import pdb; pdb.set_trace() + # clean up after unit + + # track the files that were in shared so that we can delete them + # at the end of the DAG if requires + shared_xfers = self.shared_staging.transfer_staging_to_external() + self.shared_xfer.update(set(shared_xfers)) + + # everything in permanent should also be in shared + for file in self.permanent_staging.registry: + self.shared_staging.transfer_single_file_to_external(file) + self.permanent_xfer.add(file) + + if not self.keep_scratch: + shutil.rmtree(scratch) + + if not self.keep_staging: + self.shared_staging.cleanup() diff --git a/gufe/tests/storage/test_stagingregistry.py b/gufe/tests/storage/test_stagingregistry.py new file mode 100644 index 00000000..d5e5d71e --- /dev/null +++ b/gufe/tests/storage/test_stagingregistry.py @@ -0,0 +1,366 @@ +import pytest +from unittest import mock +import logging + +import os +import pathlib + +from gufe.storage.externalresource import MemoryStorage, FileStorage +from gufe.storage.stagingregistry import ( + SharedStaging, PermanentStaging, _safe_to_delete_file, + delete_empty_dirs, # TODO: move to appropriate place +) + + +@pytest.fixture +def root(tmp_path): + external = MemoryStorage() + external.store_bytes("old_unit/data.txt", b"foo") + root = SharedStaging( + scratch=tmp_path, + external=external, + delete_staging=False + ) + return root + + +@pytest.fixture +def root_with_contents(root): + # file staged but not yet shipped to external + with open(root / "new_unit/data.txt", mode='wb') as f: + f.write(b"bar") + + return root + + +@pytest.fixture +def read_only_with_overwritten(root_with_contents): + read_only = SharedStaging( + scratch=root_with_contents.scratch, + external=root_with_contents.external, + staging=root_with_contents.staging, + delete_staging=root_with_contents.delete_staging, + read_only=True + ) + filename = (read_only / "old_unit/data.txt").as_path() + assert not filename.exists() + staged = read_only / "old_unit/data.txt" + assert not filename.exists() + staged.__fspath__() + assert filename.exists() + with open(staged, mode='w') as f: + f.write("changed") + + return read_only, staged + + +@pytest.fixture +def permanent(tmp_path): + shared = MemoryStorage() + shared.store_bytes("old_unit/data.txt", b"foo") + perm = PermanentStaging( + scratch=tmp_path / "final", + external=MemoryStorage(), + shared=shared, + delete_staging=True + ) + return perm + + +@pytest.mark.parametrize('rel_path', [ + ("bar"), ("baz"), ("../bar") +]) +def test_safe_to_delete_file(tmp_path, rel_path): + external = FileStorage(tmp_path / "foo") + external.store_bytes("bar", b"") + ext_loc = tmp_path / "foo" / "bar" + assert ext_loc.exists() + + staged = external.root_dir / rel_path + is_safe = (rel_path != "bar") + assert _safe_to_delete_file(external, staged) is is_safe + + +def test_safe_to_delete_file_not_filestorage(tmp_path): + external = MemoryStorage() + external.store_bytes("bar", b"") + staging = tmp_path / "bar" + assert _safe_to_delete_file(external, staging) + + +def test_delete_empty_dirs(tmp_path): + base = tmp_path / "tmp" + paths = [ + base / "foo" / "qux" / "qux.txt", + + ] + dirs = [ + base / "foo" / "bar" / "baz", + base / "quux", + ] + for directory in dirs: + directory.mkdir(parents=True, exist_ok=True) + + for path in paths: + path.parent.mkdir(parents=True, exist_ok=True) + path.touch() + + delete_empty_dirs(base) + for path in paths: + assert path.exists() + + for directory in dirs: + assert not directory.exists() + + assert not (base / "foo" / "bar").exists() + + +@pytest.mark.parametrize('delete_root', [True, False]) +def test_delete_empty_dirs_delete_root(tmp_path, delete_root): + base = tmp_path / "tmp" + dirs = [ + base / "foo" / "bar" / "baz", + base / "quux", + ] + for directory in dirs: + directory.mkdir(parents=True, exist_ok=True) + + delete_empty_dirs(base, delete_root=delete_root) + + for directory in dirs: + assert not directory.exists() + + assert not (base / "foo" / "bar").exists() + assert base.exists() is not delete_root + + +class TestSharedStaging: + def test_repr(self, root): + r = repr(root) + assert r.startswith("SharedStaging") + assert "MemoryStorage" in r + + def test_fspath_fail(self, root): + # ensure that we get an error on os.path.join (or really, anything + # that hits os.fspath) + with pytest.raises(TypeError): + os.path.join(root, "filename.txt") + + @pytest.mark.parametrize('pathlist', [ + ['file.txt'], ['dir', 'file.txt'] + ]) + def test_path(self, root, pathlist): + path = root + for p in pathlist: + path = path / p + + inner_path = os.sep.join(pathlist) + actual_path = root.staging_dir / inner_path + + assert pathlib.Path(path) == actual_path + + def test_read_old(self, root): + # When the file doesn't exist locally, it should be pulled down the + # first time that we register the path. + + # initial conditions, without touching StagingRegistry/StagingPath + label = "old_unit/data.txt" + on_filesystem = root.scratch / root.staging / "old_unit/data.txt" + assert not on_filesystem.exists() + assert root.external.exists(label) + + # when we create the specific StagingPath, it registers and + # "downloads" the file + filepath = root / "old_unit/data.txt" + assert filepath.as_path() == on_filesystem + + assert not on_filesystem.exists() + filepath.register() + assert on_filesystem.exists() + + # let's just be sure we can read in the data as desired + with open(filepath, mode='rb') as f: + assert f.read() == b"foo" + + def test_write_new(self, root): + label = "new_unit/somefile.txt" + on_filesystem = root.scratch / root.staging / "new_unit/somefile.txt" + assert not on_filesystem.exists() + with open(root / "new_unit/somefile.txt", mode='wb') as f: + f.write(b"testing") + + # this has been written to disk in scratch, but not yet saved to + # external storage + assert on_filesystem.exists() + assert not root.external.exists(label) + + @pytest.mark.xfail # Need test that read-only errors on new files + def test_write_old_fail(self, root): + old_staging = root._get_other_shared("old_unit") + staged = old_staging / "foo.txt" + with pytest.raises(IOError, match="read-only"): + staged.__fspath__() + + def test_transfer_to_external(self, root_with_contents): + path = list(root_with_contents.registry)[0] # only 1 + assert not root_with_contents.external.exists(path.label) + + root_with_contents.transfer_staging_to_external() + assert root_with_contents.external.exists(path.label) + + with root_with_contents.external.load_stream(path.label) as f: + assert f.read() == b"bar" + + def test_transfer_to_external_no_file(self, root, caplog): + with mock.patch.object(root, 'register_path'): + nonfile = root / "old_unit/does_not_exist.txt" + # ensure that we've set this up correctly + assert nonfile not in root.registry + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.INFO, logger=logger_name) + root.transfer_single_file_to_external(nonfile) + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "nonexistent" in record.msg + + def test_transfer_to_external_directory(self, root, caplog): + directory = root / "old_unit/directory" + with open(directory / "file.txt", mode='w') as f: + f.write("foo") + + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.DEBUG, logger=logger_name) + root.transfer_single_file_to_external(directory) + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "Found directory" in record.msg + assert "not transfering" in record.msg + + def test_single_file_transfer_read_only(self, + read_only_with_overwritten, + caplog): + read_only, staged = read_only_with_overwritten + with read_only.external.load_stream("old_unit/data.txt") as f: + old_contents = f.read() + + assert old_contents == b"foo" + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.DEBUG, logger=logger_name) + read_only.transfer_single_file_to_external(staged) + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "Read-only:" in record.msg + with read_only.external.load_stream("old_unit/data.txt") as f: + new_contents = f.read() + assert old_contents == new_contents + + def test_transfer_read_only(self, read_only_with_overwritten, caplog): + read_only, staged = read_only_with_overwritten + with read_only.external.load_stream("old_unit/data.txt") as f: + old_contents = f.read() + + assert old_contents == b"foo" + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.DEBUG, logger=logger_name) + read_only.transfer_staging_to_external() + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "Read-only:" in record.msg + with read_only.external.load_stream("old_unit/data.txt") as f: + new_contents = f.read() + assert old_contents == new_contents + + def test_cleanup(self, root_with_contents): + root_with_contents.delete_staging = True # slightly naughty + path = (root_with_contents / "new_unit/data.txt").as_path() + assert path.exists() + root_with_contents.cleanup() + assert not path.exists() + + def test_cleanup_missing(self, root, caplog): + root.delete_staging = True + file = root / "old_unit/foo.txt" + file.register() + assert file in root.registry + assert not pathlib.Path(file).exists() + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.WARNING, logger=logger_name) + root.cleanup() + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "can not be found on disk" in record.msg + + def test_cleanup_directory(self, root, caplog): + root.delete_staging = True + dirname = root / "old_unit" + assert dirname not in root.registry + dirname.register() + assert dirname in root.registry + + assert not pathlib.Path(dirname).exists() + file = dirname / "foo.txt" + file.register() + # directory is created when something in the directory registered + assert pathlib.Path(dirname).exists() + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.DEBUG, logger=logger_name) + root.cleanup() + assert "During staging cleanup, the directory" in caplog.text + + def test_register_cleanup_preexisting_file(self, root): + filename = (root / "new_unit/foo.txt").as_path() + filename.parent.mkdir(parents=True, exist_ok=True) + filename.touch() + root.external.store_bytes("new_unit/foo.txt", b"") + assert len(root.registry) == 0 + assert len(root.preexisting) == 0 + staging = root / "new_unit/foo.txt" + assert staging.label == "new_unit/foo.txt" + assert len(root.registry) == 0 + assert len(root.preexisting) == 0 + staging.__fspath__() + assert len(root.registry) == 1 + assert len(root.preexisting) == 1 + + assert filename.exists() + root.cleanup() + assert filename.exists() + + +class TestPermanentStaging: + @pytest.mark.parametrize('is_safe', [True, False]) + def test_delete_file_safe(self, tmp_path, is_safe): + staging = ".staging" if is_safe else "" + scratch_root_dir = tmp_path / "final" + + # create a file in the external storage + external = FileStorage(scratch_root_dir) + external.store_bytes("foo.txt", b"foo") + external_file_loc = external.root_dir / "foo.txt" + assert external_file_loc.exists() + + permanent = PermanentStaging( + scratch=scratch_root_dir, + external=MemoryStorage(), + shared=external, + staging=staging, + delete_staging=True + ) + my_file = permanent / "foo.txt" + + # double check that we set things up correctly + assert (str(external_file_loc) != my_file._fspath) is is_safe + + # test the code + assert permanent._delete_file_safe(my_file) is is_safe + + def test_load_missing_for_transfer(self, permanent): + fname = (permanent / "old_unit/data.txt").as_path() + assert not fname.exists() + staging = permanent / "old_unit/data.txt" + staging.__fspath__() + assert not fname.exists() + assert permanent.external._data == {} + permanent.transfer_staging_to_external() + assert fname.exists() + assert permanent.external._data == {"old_unit/data.txt": b"foo"} diff --git a/gufe/tests/storage/test_storagemanager.py b/gufe/tests/storage/test_storagemanager.py new file mode 100644 index 00000000..779d5552 --- /dev/null +++ b/gufe/tests/storage/test_storagemanager.py @@ -0,0 +1,297 @@ +import pytest +from gufe.storage.storagemanager import StorageManager +from gufe.storage.externalresource import MemoryStorage, FileStorage +from pathlib import Path + + +@pytest.fixture +def storage_manager_std(tmp_path): + return StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + +@pytest.fixture +def dag_units(): + class Unit1: + key = "unit1" + + def run(self, scratch, shared, permanent): + (scratch / "foo.txt").touch() + with open(shared / "bar.txt", mode='w') as f: + f.write("bar was written") + with open(permanent / "baz.txt", mode='w') as f: + f.write("baz was written") + + return "done 1" + + class Unit2: + key = "unit2" + + def run(self, scratch, shared, permanent): + (scratch / "foo2.txt").touch() + # TODO: this will change; the inputs should include a way to get + # the previous shared unit label + prev_shared = shared.root / "dag/unit1_attempt_0" + with open(prev_shared / "bar.txt", mode='r') as f: + bar = f.read() + + # note that you can open a file from permanent as if it was + # from shared -- everything in permanent is in shared + with open(prev_shared / "baz.txt", mode='r') as f: + baz = f.read() + + return {"bar": bar, "baz": baz} + + return [Unit1(), Unit2()] + + +class LifecycleHarness: + @pytest.fixture + def storage_manager(self, tmp_path): + raise NotImplementedError() + + @staticmethod + def get_files_dict(storage_manager): + root = storage_manager.scratch_root + staging = storage_manager.staging + return { + "foo": root / "scratch/dag/unit1_attempt_0/foo.txt", + "foo2": root / "scratch/dag/unit2_attempt_0/foo2.txt", + "bar": root / staging / "dag/unit1_attempt_0/bar.txt", + "baz": root / staging / "dag/unit1_attempt_0/baz.txt", + } + + def test_lifecycle(self, storage_manager, dag_units, tmp_path): + results = [] + dag_label = "dag" + with storage_manager.running_dag(dag_label) as dag_ctx: + for unit in dag_units: + label = f"{dag_label}/{unit.key}" + with dag_ctx.running_unit(dag_label, unit.key, attempt=0) as ( + scratch, shared, perm + ): + # import pdb; pdb.set_trace() + results.append(unit.run(scratch, shared, perm)) + self.in_unit_asserts(storage_manager, label) + self.after_unit_asserts(storage_manager, label) + self.after_dag_asserts(storage_manager) + assert results == [ + "done 1", + {"bar": "bar was written", "baz": "baz was written"} + ] + + def _in_unit_existing_files(self, unit_label): + raise NotImplementedError() + + def _after_unit_existing_files(self, unit_label): + raise NotImplementedError() + + def _after_dag_existing_files(self): + raise NotImplementedError() + + @staticmethod + def assert_existing_files(files_dict, existing): + for file in existing: + assert files_dict[file].exists() + + for file in set(files_dict) - existing: + assert not files_dict[file].exists() + + def _in_staging_shared(self, unit_label, in_after): + """ + This is to include things when a shared staging directory reports + that files exist in it. + """ + return set() + + def _in_staging_permanent(self, unit_label, in_after): + """ + This is to include things when a permanent staging directory reports + that files exist in it. + """ + return set() + + def in_unit_asserts(self, storage_manager, unit_label): + # check that shared and permanent are correct + shared_root = storage_manager.shared_root + permanent_root = storage_manager.permanent_root + expected_in_shared = { + "dag/unit1": set(), + "dag/unit2": {"dag/unit1_attempt_0/bar.txt", + "dag/unit1_attempt_0/baz.txt"} + }[unit_label] | self._in_staging_shared(unit_label, "in") + assert set(shared_root.iter_contents()) == expected_in_shared + + expected_in_permanent = self._in_staging_permanent(unit_label, "in") + assert set(permanent_root.iter_contents()) == expected_in_permanent + + # manager-specific check for files + files_dict = self.get_files_dict(storage_manager) + existing = self._in_unit_existing_files(unit_label) + self.assert_existing_files(files_dict, existing) + + def after_unit_asserts(self, storage_manager, unit_label): + shared_root = storage_manager.shared_root + permanent_root = storage_manager.permanent_root + shared_extras = self._in_staging_shared(unit_label, "after") + permanent_extras = self._in_staging_permanent(unit_label, "after") + expected_in_shared = {"dag/unit1_attempt_0/bar.txt", + "dag/unit1_attempt_0/baz.txt"} + expected_in_shared |= shared_extras + assert set(shared_root.iter_contents()) == expected_in_shared + assert set(permanent_root.iter_contents()) == permanent_extras + + # manager-specific check for files + files_dict = self.get_files_dict(storage_manager) + existing = self._after_unit_existing_files(unit_label) + self.assert_existing_files(files_dict, existing) + + def after_dag_asserts(self, storage_manager): + permanent_root = storage_manager.permanent_root + permanent_extras = self._in_staging_permanent('dag/unit2', "after") + # shared still contains everything it had; but this isn't something + # we guarantee, so we don't actually test for it, but we could with + # this: + # shared_root = storage_manager.shared_root + # shared_extras = self._in_staging_shared('dag/unit2', "after") + # expected_in_shared = {"dag/unit1/bar.txt", "dag/unit1/baz.txt"} + # expected_in_shared |= shared_extras + # assert set(shared_root.iter_contents()) == expected_in_shared + expected_in_permanent = ({"dag/unit1_attempt_0/baz.txt"} + | permanent_extras) + assert set(permanent_root.iter_contents()) == expected_in_permanent + + # manager-specific check for files + files_dict = self.get_files_dict(storage_manager) + existing = self._after_dag_existing_files() + self.assert_existing_files(files_dict, existing) + + +class TestStandardStorageManager(LifecycleHarness): + @pytest.fixture + def storage_manager(self, storage_manager_std): + return storage_manager_std + + def _in_unit_existing_files(self, unit_label): + return { + "dag/unit1": {'bar', 'baz', 'foo'}, + "dag/unit2": {'foo2', 'baz', 'bar'}, + # bar was deleted, but gets brought back in unit2 + }[unit_label] + + def _after_unit_existing_files(self, unit_label): + # Same for both units because unit2 doesn't add anything to + # shared/permanent; in this one, only files staged for permanent + # should remain + return {'baz'} + + def _after_dag_existing_files(self): + return set() + + +class TestKeepScratchAndStagingStorageManager(LifecycleHarness): + @pytest.fixture + def storage_manager(self, tmp_path): + return StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + keep_scratch=True, + keep_staging=True + ) + + @staticmethod + def files_after_unit(unit_label): + unit1 = {'bar', 'baz', 'foo'} + unit2 = {'foo2', 'baz'} + return { + 'dag/unit1': unit1, + 'dag/unit2': unit1 | unit2 + }[unit_label] + + def _in_unit_existing_files(self, unit_label): + return self.files_after_unit(unit_label) + + def _after_unit_existing_files(self, unit_label): + return self.files_after_unit(unit_label) + + def _after_dag_existing_files(self): + return self.files_after_unit('dag/unit2') + + +class TestStagingOverlapsSharedStorageManager(LifecycleHarness): + @pytest.fixture + def storage_manager(self, tmp_path): + root = tmp_path / "working" + return StorageManager( + scratch_root=root, + shared_root=FileStorage(root), + permanent_root=MemoryStorage(), + staging="", + ) + + def _in_unit_existing_files(self, unit_label): + return { + "dag/unit1": {'foo', 'bar', 'baz'}, + "dag/unit2": {'foo2', 'bar', 'baz'}, + }[unit_label] + + def _after_unit_existing_files(self, unit_label): + # same for both; all files come from unit 1 + return {"bar", "baz"} + + def _after_dag_existing_files(self): + # these get deleted because we don't keep shared here + return set() + + def _in_staging_shared(self, unit_label, in_after): + bar = "dag/unit1_attempt_0/bar.txt" + baz = "dag/unit1_attempt_0/baz.txt" + foo = "scratch/dag/unit1_attempt_0/foo.txt" + foo2 = "scratch/dag/unit2_attempt_0/foo2.txt" + return { + ("dag/unit1", "in"): {bar, baz, foo}, + ("dag/unit1", "after"): {bar, baz}, + ("dag/unit2", "in"): {bar, baz, foo2}, + ("dag/unit2", "after"): {baz} + }[unit_label, in_after] + + +class TestStagingOverlapsPermanentStorageManager(LifecycleHarness): + @pytest.fixture + def storage_manager(self, tmp_path): + root = tmp_path / "working" + return StorageManager( + scratch_root=root, + permanent_root=FileStorage(root), + shared_root=MemoryStorage(), + staging="", + ) + + def _in_unit_existing_files(self, unit_label): + return { + "dag/unit1": {'foo', 'bar', 'baz'}, + "dag/unit2": {"foo2", "baz", "bar"}, # bar is resurrected + }[unit_label] + + def _after_dag_existing_files(self): + return {"baz"} + + def _in_staging_permanent(self, unit_label, in_after): + bar = "dag/unit1_attempt_0/bar.txt" + baz = "dag/unit1_attempt_0/baz.txt" + foo = "scratch/dag/unit1_attempt_0/foo.txt" + foo2 = "scratch/dag/unit2_attempt_0/foo2.txt" + return { + ("dag/unit1", "in"): {bar, baz, foo}, + ("dag/unit1", "after"): {baz}, + ("dag/unit2", "in"): {baz, foo2, bar}, # bar is resurrected + ("dag/unit2", "after"): {baz} + }[unit_label, in_after] + + def _after_unit_existing_files(self, unit_label): + # same for both; all files come from unit 1 + return {"baz"} diff --git a/gufe/utils.py b/gufe/utils.py index f9d3b0ff..519835d5 100644 --- a/gufe/utils.py +++ b/gufe/utils.py @@ -4,6 +4,12 @@ import io import warnings +from os import PathLike, rmdir +import pathlib + +import logging +_logger = logging.getLogger(__name__) + class ensure_filelike: """Context manager to convert pathlike or filelike to filelike. @@ -52,3 +58,24 @@ def __exit__(self, type, value, traceback): if self.do_close: self.context.close() + +def delete_empty_dirs(root: PathLike, delete_root: bool = True): + """Delete all empty directories. + + Repeats so that directories that only contained empty directories also + get deleted. + """ + root = pathlib.Path(root) + + def find_empty_dirs(directory): + if not (paths := list(directory.iterdir())): + return [directory] + directories = [p for p in paths if p.is_dir()] + return sum([find_empty_dirs(d) for d in directories], []) + + while root.exists() and (empties := find_empty_dirs(root)): + if empties == [root] and not delete_root: + return + for directory in empties: + _logger.debug(f"Removing '{directory}'") + rmdir(directory)