diff --git a/client/ayon_core/host/__init__.py b/client/ayon_core/host/__init__.py index da1237c739..b252b03d76 100644 --- a/client/ayon_core/host/__init__.py +++ b/client/ayon_core/host/__init__.py @@ -4,6 +4,8 @@ from .interfaces import ( IWorkfileHost, + WorkfileInfo, + PublishedWorkfileInfo, ILoadHost, IPublishHost, INewPublisher, @@ -16,6 +18,8 @@ "HostBase", "IWorkfileHost", + "WorkfileInfo", + "PublishedWorkfileInfo", "ILoadHost", "IPublishHost", "INewPublisher", diff --git a/client/ayon_core/host/host.py b/client/ayon_core/host/host.py index 5a29de6cd7..3333cf3778 100644 --- a/client/ayon_core/host/host.py +++ b/client/ayon_core/host/host.py @@ -1,7 +1,7 @@ import os import logging import contextlib -from abc import ABC, abstractproperty +from abc import ABC, abstractmethod # NOTE can't import 'typing' because of issues in Maya 2020 # - shiboken crashes on 'typing' module import @@ -92,7 +92,8 @@ def log(self): self._log = logging.getLogger(self.__class__.__name__) return self._log - @abstractproperty + @property + @abstractmethod def name(self): """Host name.""" diff --git a/client/ayon_core/host/interfaces/__init__.py b/client/ayon_core/host/interfaces/__init__.py new file mode 100644 index 0000000000..379d8555fb --- /dev/null +++ b/client/ayon_core/host/interfaces/__init__.py @@ -0,0 +1,20 @@ +from .exceptions import MissingMethodsError +from .workfiles import IWorkfileHost, WorkfileInfo, PublishedWorkfileInfo +from .interfaces import ( + IPublishHost, + INewPublisher, + ILoadHost, +) + + +__all__ = ( + "MissingMethodsError", + + "IWorkfileHost", + "WorkfileInfo", + "PublishedWorkfileInfo", + + "IPublishHost", + "INewPublisher", + "ILoadHost", +) diff --git a/client/ayon_core/host/interfaces/exceptions.py b/client/ayon_core/host/interfaces/exceptions.py new file mode 100644 index 0000000000..c6b4cef4b4 --- /dev/null +++ b/client/ayon_core/host/interfaces/exceptions.py @@ -0,0 +1,15 @@ +class MissingMethodsError(ValueError): + """Exception when host miss some required methods for specific workflow. + + Args: + host (HostBase): Host implementation where are missing methods. + missing_methods (list[str]): List of missing methods. + """ + + def __init__(self, host, missing_methods): + joined_missing = ", ".join( + ['"{}"'.format(item) for item in missing_methods] + ) + super().__init__( + f"Host \"{host.name}\" miss methods {joined_missing}" + ) diff --git a/client/ayon_core/host/interfaces.py b/client/ayon_core/host/interfaces/interfaces.py similarity index 50% rename from client/ayon_core/host/interfaces.py rename to client/ayon_core/host/interfaces/interfaces.py index c077dfeae9..a41dffe92a 100644 --- a/client/ayon_core/host/interfaces.py +++ b/client/ayon_core/host/interfaces/interfaces.py @@ -1,28 +1,6 @@ -from abc import ABC, abstractmethod +from abc import abstractmethod - -class MissingMethodsError(ValueError): - """Exception when host miss some required methods for specific workflow. - - Args: - host (HostBase): Host implementation where are missing methods. - missing_methods (list[str]): List of missing methods. - """ - - def __init__(self, host, missing_methods): - joined_missing = ", ".join( - ['"{}"'.format(item) for item in missing_methods] - ) - host_name = getattr(host, "name", None) - if not host_name: - try: - host_name = host.__file__.replace("\\", "/").split("/")[-3] - except Exception: - host_name = str(host) - message = ( - "Host \"{}\" miss methods {}".format(host_name, joined_missing) - ) - super(MissingMethodsError, self).__init__(message) +from .exceptions import MissingMethodsError class ILoadHost: @@ -105,181 +83,6 @@ def ls(self): return self.get_containers() -class IWorkfileHost(ABC): - """Implementation requirements to be able use workfile utils and tool.""" - - @staticmethod - def get_missing_workfile_methods(host): - """Look for missing methods on "old type" host implementation. - - Method is used for validation of implemented functions related to - workfiles. Checks only existence of methods. - - Args: - Union[ModuleType, HostBase]: Object of host where to look for - required methods. - - Returns: - list[str]: Missing method implementations for workfiles workflow. - """ - - if isinstance(host, IWorkfileHost): - return [] - - required = [ - "open_file", - "save_file", - "current_file", - "has_unsaved_changes", - "file_extensions", - "work_root", - ] - missing = [] - for name in required: - if not hasattr(host, name): - missing.append(name) - return missing - - @staticmethod - def validate_workfile_methods(host): - """Validate methods of "old type" host for workfiles workflow. - - Args: - Union[ModuleType, HostBase]: Object of host to validate. - - Raises: - MissingMethodsError: If there are missing methods on host - implementation. - """ - - missing = IWorkfileHost.get_missing_workfile_methods(host) - if missing: - raise MissingMethodsError(host, missing) - - @abstractmethod - def get_workfile_extensions(self): - """Extensions that can be used as save. - - Questions: - This could potentially use 'HostDefinition'. - """ - - return [] - - @abstractmethod - def save_workfile(self, dst_path=None): - """Save currently opened scene. - - Args: - dst_path (str): Where the current scene should be saved. Or use - current path if 'None' is passed. - """ - - pass - - @abstractmethod - def open_workfile(self, filepath): - """Open passed filepath in the host. - - Args: - filepath (str): Path to workfile. - """ - - pass - - @abstractmethod - def get_current_workfile(self): - """Retrieve path to current opened file. - - Returns: - str: Path to file which is currently opened. - None: If nothing is opened. - """ - - return None - - def workfile_has_unsaved_changes(self): - """Currently opened scene is saved. - - Not all hosts can know if current scene is saved because the API of - DCC does not support it. - - Returns: - bool: True if scene is saved and False if has unsaved - modifications. - None: Can't tell if workfiles has modifications. - """ - - return None - - def work_root(self, session): - """Modify workdir per host. - - Default implementation keeps workdir untouched. - - Warnings: - We must handle this modification with more sophisticated way - because this can't be called out of DCC so opening of last workfile - (calculated before DCC is launched) is complicated. Also breaking - defined work template is not a good idea. - Only place where it's really used and can make sense is Maya. There - workspace.mel can modify subfolders where to look for maya files. - - Args: - session (dict): Session context data. - - Returns: - str: Path to new workdir. - """ - - return session["AYON_WORKDIR"] - - # --- Deprecated method names --- - def file_extensions(self): - """Deprecated variant of 'get_workfile_extensions'. - - Todo: - Remove when all usages are replaced. - """ - return self.get_workfile_extensions() - - def save_file(self, dst_path=None): - """Deprecated variant of 'save_workfile'. - - Todo: - Remove when all usages are replaced. - """ - - self.save_workfile(dst_path) - - def open_file(self, filepath): - """Deprecated variant of 'open_workfile'. - - Todo: - Remove when all usages are replaced. - """ - - return self.open_workfile(filepath) - - def current_file(self): - """Deprecated variant of 'get_current_workfile'. - - Todo: - Remove when all usages are replaced. - """ - - return self.get_current_workfile() - - def has_unsaved_changes(self): - """Deprecated variant of 'workfile_has_unsaved_changes'. - - Todo: - Remove when all usages are replaced. - """ - - return self.workfile_has_unsaved_changes() - - class IPublishHost: """Functions related to new creation system in new publisher. diff --git a/client/ayon_core/host/interfaces/workfiles.py b/client/ayon_core/host/interfaces/workfiles.py new file mode 100644 index 0000000000..f416d19aa0 --- /dev/null +++ b/client/ayon_core/host/interfaces/workfiles.py @@ -0,0 +1,654 @@ +from __future__ import annotations +import os +import platform +import shutil +from abc import abstractmethod +from dataclasses import dataclass, asdict +import typing +from typing import Optional, Any + +import ayon_api +import arrow + +if typing.TYPE_CHECKING: + from ayon_core.pipeline import Anatomy + + +@dataclass +class WorkfileInfo: + filepath: str + rootless_path: str + file_size: Optional[float] + file_created: Optional[float] + file_modified: Optional[float] + workfile_entity_id: Optional[str] + description: str + created_by: Optional[str] + updated_by: Optional[str] + available: bool + + @classmethod + def new(cls, filepath, rootless_path, available, workfile_entity): + file_size = file_modified = file_created = None + if filepath and os.path.exists(filepath): + filestat = os.stat(filepath) + file_size = filestat.st_size + file_created = filestat.st_ctime + file_modified = filestat.st_mtime + + if workfile_entity is None: + workfile_entity = {} + + attrib = {} + if workfile_entity: + attrib = workfile_entity["attrib"] + + return cls( + filepath=filepath, + rootless_path=rootless_path, + file_size=file_size, + file_created=file_created, + file_modified=file_modified, + workfile_entity_id=workfile_entity.get("id"), + description=attrib.get("description") or "", + created_by=workfile_entity.get("createdBy"), + updated_by=workfile_entity.get("updatedBy"), + available=available, + ) + + def to_data(self): + """Converts file item to data. + + Returns: + dict[str, Any]: Workfile item data. + + """ + return asdict(self) + + @classmethod + def from_data(cls, data): + """Converts data to workfile item. + + Args: + data (dict[str, Any]): Workfile item data. + + Returns: + WorkfileInfo: File item. + + """ + return WorkfileInfo(**data) + + +@dataclass +class PublishedWorkfileInfo: + project_name: str + folder_id: str + task_id: Optional[str] + representation_id: str + filepath: str + created_at: float + author: str + available: bool + file_size: Optional[float] + file_created: Optional[float] + file_modified: Optional[float] + + @classmethod + def new( + cls, + project_name: str, + folder_id: str, + task_id: Optional[str], + repre_entity: dict[str, Any], + filepath: str, + author: str, + available: bool, + file_size: Optional[float], + file_modified: Optional[float], + file_created: Optional[float], + ): + created_at = arrow.get(repre_entity["createdAt"]).to("local") + + return cls( + project_name=project_name, + folder_id=folder_id, + task_id=task_id, + representation_id=repre_entity["id"], + filepath=filepath, + created_at=created_at.float_timestamp, + author=author, + available=available, + file_size=file_size, + file_created=file_created, + file_modified=file_modified, + ) + + def to_data(self): + """Converts file item to data. + + Returns: + dict[str, Any]: Workfile item data. + + """ + return asdict(self) + + @classmethod + def from_data(cls, data): + """Converts data to workfile item. + + Args: + data (dict[str, Any]): Workfile item data. + + Returns: + WorkfileInfo: File item. + + """ + return WorkfileInfo(**data) + + +class IWorkfileHost: + """Implementation requirements to be able use workfile utils and tool.""" + + @abstractmethod + def save_workfile(self, dst_path: Optional[str] = None): + """Save currently opened scene. + + Args: + dst_path (str): Where the current scene should be saved. Or use + current path if 'None' is passed. + + """ + pass + + @abstractmethod + def open_workfile(self, filepath: str): + """Open passed filepath in the host. + + Args: + filepath (str): Path to workfile. + + """ + pass + + @abstractmethod + def get_current_workfile(self) -> Optional[str]: + """Retrieve path to current opened file. + + Returns: + Optional[str]: Path to file which is currently opened. None if + nothing is opened. + + """ + return None + + def workfile_has_unsaved_changes(self) -> Optional[bool]: + """Currently opened scene is saved. + + Not all hosts can know if current scene is saved because the API of + DCC does not support it. + + Returns: + Optional[bool]: True if scene is saved and False if has unsaved + modifications. None if can't tell if workfiles has + modifications. + + """ + return None + + def get_workfile_extensions(self) -> list[str]: + """Extensions that can be used as save workfile. + + Notes: + Method may not be used if 'list_workfiles' and + 'list_published_workfiles' are re-implemented with different + logic. + + Returns: + list[str]: List of extensions that can be used for saving. + + """ + return [] + + def save_workfile_with_context( + self, + filepath: str, + folder_entity: dict[str, Any] = None, + task_entity: dict[str, Any] = None, + ): + """Save current workfile with context. + + Notes: + Should this method care about context change? + + Args: + filepath (str): Where the current scene should be saved. + folder_entity (dict[str, Any]): Folder entity. + task_entity (dict[str, Any]): Task entity. + + """ + self.save_workfile(filepath) + + def open_workfile_with_context( + self, + filepath: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + ): + """Open passed filepath in the host with context. + + This function should be used to open workfile in different context. + + Notes: + Should this method care about context change? + + Args: + filepath (str): Path to workfile. + folder_entity (dict[str, Any]): Folder id. + task_entity (dict[str, Any]): Task id. + + """ + self.open_workfile(filepath) + + def list_workfiles( + self, + project_name: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + project_entity: Optional[dict[str, Any]] = None, + workfile_entities: Optional[list[dict[str, Any]]] = None, + template_key: Optional[str] = None, + project_settings: Optional[dict[str, Any]] = None, + anatomy: Optional["Anatomy"] = None, + ) -> list[WorkfileInfo]: + """List workfiles in the given folder. + + NOTES: + - Better method name? + - This method is pre-implemented as the logic can be shared across + 95% of host integrations. Ad-hoc implementation to give host + integration workfile api functionality. + - Should this method also handle workfiles based on workfile entities? + + Args: + project_name (str): Name of project. + folder_entity (dict[str, Any]): Folder entity. + task_entity (dict[str, Any]): Task entity. + project_entity (Optional[dict[str, Any]]): Project entity. + workfile_entities (Optional[list[dict[str, Any]]]): Workfile + entities. + template_key (Optional[str]): Template key. + project_settings (Optional[dict[str, Any]]): Project settings. + anatomy (Anatomy): Project anatomy. + + Returns: + list[WorkfileInfo]: List of workfiles. + + """ + from ayon_core.pipeline import Anatomy + from ayon_core.pipeline.template_data import get_template_data + from ayon_core.pipeline.workfile import get_workdir_with_workdir_data + + extensions = self.get_workfile_extensions() + if not extensions: + return [] + + if project_entity is None: + project_entity = ayon_api.get_project(project_name) + + if workfile_entities is None: + task_id = task_entity["id"] + workfile_entities = list(ayon_api.get_workfiles_info( + project_name, task_ids=[task_id] + )) + + if anatomy is None: + anatomy = Anatomy(project_name, project_entity=project_entity) + + workfile_entities_by_path = { + workfile_entity["path"]: workfile_entity + for workfile_entity in workfile_entities + } + + workdir_data = get_template_data( + project_entity, + folder_entity, + task_entity, + host_name=self.name, + ) + workdir = get_workdir_with_workdir_data( + workdir_data, + project_name, + anatomy=anatomy, + template_key=template_key, + project_settings=project_settings, + ) + + if platform.system().lower() == "windows": + rootless_workdir = workdir.replace("\\", "/") + else: + rootless_workdir = workdir + + used_roots = workdir.used_values.get("root") + if used_roots: + used_root_name = next(iter(used_roots)) + root_value = used_roots[used_root_name] + workdir_end = rootless_workdir[len(root_value):].lstrip("/") + rootless_workdir = f"{{root[{used_root_name}]}}/{workdir_end}" + + filenames = [] + if os.path.exists(workdir): + filenames = list(os.listdir(workdir)) + + items = [] + for filename in filenames: + filepath = os.path.join(workdir, filename) + # TODO add 'default' support for folders + ext = os.path.splitext(filepath)[1].lower() + if ext not in extensions: + continue + + rootless_path = f"{rootless_workdir}/{filename}" + workfile_entity = workfile_entities_by_path.pop( + rootless_path, None + ) + items.append(WorkfileInfo.new( + filepath, rootless_path, True, workfile_entity + )) + + for workfile_entity in workfile_entities_by_path.values(): + # Workfile entity is not in the filesystem + # but it is in the database + rootless_path = workfile_entity["path"] + filepath = anatomy.fill_root(rootless_path) + items.append(WorkfileInfo.new( + filepath, rootless_path, False, workfile_entity + )) + + return items + + def list_published_workfiles( + self, + project_name: str, + folder_id: str, + anatomy: Optional["Anatomy"] = None, + version_entities: Optional[list[dict[str, Any]]] = None, + repre_entities: Optional[list[dict[str, Any]]] = None, + ) -> list[PublishedWorkfileInfo]: + """List published workfiles for given folder. + + Default implementation looks for products with 'workfile' + product type. + + Pre-fetched entities have mandatory fields to be fetched. + - Version: 'id', 'author', 'taskId' + - Representation: 'id', 'versionId', 'files' + + Args: + project_name (str): Project name. + folder_id (str): Folder id. + anatomy (Anatomy): Project anatomy. + version_entities (Optional[list[dict[str, Any]]]): Pre-fetched + version entities. + repre_entities (Optional[list[dict[str, Any]]]): Pre-fetched + representation entities. + + Returns: + list[PublishedWorkfileInfo]: Published workfile information for + given context. + + """ + from ayon_core.pipeline import Anatomy + + # Get all representations of the folder + ( + version_entities, + repre_entities + ) = self._fetch_workfile_entities( + project_name, + folder_id, + version_entities, + repre_entities, + ) + if not repre_entities: + return [] + + if anatomy is None: + anatomy = Anatomy(project_name) + + versions_by_id = { + version_entity["id"]: version_entity + for version_entity in version_entities + } + extensions = { + ext.lstrip(".") + for ext in self.get_workfile_extensions() + } + items = [] + for repre_entity in repre_entities: + version_id = repre_entity["versionId"] + version_entity = versions_by_id[version_id] + task_id = version_entity["taskId"] + + # Filter by extension + workfile_path = None + for repre_file in repre_entity["files"]: + ext = ( + os.path.splitext(repre_file["name"])[1] + .lower() + .lstrip(".") + ) + if ext in extensions: + workfile_path = repre_file["path"] + break + + if not workfile_path: + continue + + try: + workfile_path = workfile_path.format(root=anatomy.roots) + except Exception as exc: + print(f"Failed to format workfile path: {exc}") + + is_available = False + file_size = file_modified = file_created = None + if workfile_path and os.path.exists(workfile_path): + filestat = os.stat(workfile_path) + is_available = True + file_size = filestat.st_size + file_created = filestat.st_ctime + file_modified = filestat.st_mtime + + workfile_item = PublishedWorkfileInfo.new( + project_name, + folder_id, + task_id, + repre_entity, + workfile_path, + version_entity["author"], + is_available, + file_size, + file_created, + file_modified, + ) + items.append(workfile_item) + + return items + + def copy_workfile( + self, + src_path: str, + dst_path: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + open_workfile: bool = False, + ): + """Save workfile path with target folder and task context. + + It is expected that workfile is saved to current project, but can be + copied from other project. + + Args: + src_path (str): Path to the source scene. + dst_path (str): Where the scene should be saved. + folder_entity (dict[str, Any]): Folder entity. + task_entity (dict[str, Any]): Task entity. + open_workfile (bool): Open workfile when copied. + + """ + # TODO We might need option to open file once copied as some DCC might + # actually need to open the workfile to copy it. + dst_dir = os.path.dirname(dst_path) + if not os.path.exists(dst_dir): + os.makedirs(dst_dir, exist_ok=True) + shutil.copy(src_path, dst_path) + if open_workfile: + self.open_workfile_with_context( + dst_path, + folder_entity, + task_entity, + ) + + def copy_workfile_representation( + self, + src_project_name: str, + src_representation_entity: dict[str, Any], + dst_path: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + open_workfile: bool = False, + anatomy: Optional[Anatomy] = None, + src_representation_path: Optional[str] = None, + ): + """Copy workfile representation. + + Use representation as source for the workfile. + + Args: + src_project_name (str): Project name. + src_representation_entity (dict[str, Any]): Representation + entity. + dst_path (str): Where the scene should be saved. + folder_entity (dict[str, Any): Folder entity. + task_entity (dict[str, Any]): Task entity. + open_workfile (bool): Open workfile when copied. + anatomy (Optional[Anatomy]): Project anatomy. + src_representation_path (Optional[str]): Representation path. + + """ + # TODO We might need option to open file once copied as some DCC might + # actually need to open the workfile to copy it. + from ayon_core.pipeline import Anatomy + from ayon_core.pipeline.load import ( + get_representation_path_with_anatomy + ) + + if src_representation_path is None: + if anatomy is None: + anatomy = Anatomy(src_project_name) + src_representation_path = get_representation_path_with_anatomy( + src_representation_entity, + anatomy, + ) + + self.copy_workfile( + src_representation_path, + dst_path, + folder_entity, + task_entity, + open_workfile=open_workfile, + ) + + # --- Deprecated method names --- + def file_extensions(self): + """Deprecated variant of 'get_workfile_extensions'. + + Todo: + Remove when all usages are replaced. + """ + return self.get_workfile_extensions() + + def save_file(self, dst_path=None): + """Deprecated variant of 'save_workfile'. + + Todo: + Remove when all usages are replaced. + """ + + self.save_workfile(dst_path) + + def open_file(self, filepath): + """Deprecated variant of 'open_workfile'. + + Todo: + Remove when all usages are replaced. + """ + + return self.open_workfile(filepath) + + def current_file(self): + """Deprecated variant of 'get_current_workfile'. + + Todo: + Remove when all usages are replaced. + """ + + return self.get_current_workfile() + + def has_unsaved_changes(self): + """Deprecated variant of 'workfile_has_unsaved_changes'. + + Todo: + Remove when all usages are replaced. + """ + + return self.workfile_has_unsaved_changes() + + def _fetch_workfile_entities( + self, + project_name: str, + folder_id: str, + version_entities: Optional[list[dict[str, Any]]], + repre_entities: Optional[list[dict[str, Any]]], + ) -> tuple[ + list[dict[str, Any]], + list[dict[str, Any]] + ]: + if repre_entities is not None and version_entities is None: + # Get versions of representations + version_ids = {r["versionId"] for r in repre_entities} + version_entities = list(ayon_api.get_versions( + project_name, + version_ids=version_ids, + fields={"id", "author", "taskId"}, + )) + + if version_entities is None: + # Get product entities of folder + product_entities = ayon_api.get_products( + project_name, + folder_ids={folder_id}, + product_types={"workfile"}, + fields={"id", "name"} + ) + + version_entities = [] + product_ids = {product["id"] for product in product_entities} + if product_ids: + # Get version docs of products with their families + version_entities = list(ayon_api.get_versions( + project_name, + product_ids=product_ids, + fields={"id", "author", "taskId"}, + )) + + # Fetch representations of filtered versions and add filter for + # extension + if repre_entities is None: + repre_entities = [] + if version_entities: + repre_entities = list(ayon_api.get_representations( + project_name, + version_ids={v["id"] for v in version_entities} + )) + + return version_entities, repre_entities diff --git a/client/ayon_core/pipeline/context_tools.py b/client/ayon_core/pipeline/context_tools.py index 66556bbb35..0c6e86ef4b 100644 --- a/client/ayon_core/pipeline/context_tools.py +++ b/client/ayon_core/pipeline/context_tools.py @@ -10,7 +10,7 @@ from pyblish.lib import MessageHandler from ayon_core import AYON_CORE_ROOT -from ayon_core.host import HostBase +from ayon_core.host import HostBase, IWorkfileHost from ayon_core.lib import ( is_in_tests, initialize_ayon_connection, @@ -505,37 +505,64 @@ def get_current_context_custom_workfile_template(project_settings=None): ) -def change_current_context(folder_entity, task_entity, template_key=None): +def change_current_context( + folder_entity, + task_entity, + template_key=None, + workdir=None, + anatomy=None, + project_entity=None, + project_settings=None, +): """Update active Session to a new task work area. This updates the live Session to a different task under folder. + Notes: + This function does a lot of things related to workfiles which + extends arguments options a lot. + We might want to implement 'set_current_context' on host integration + instead. But `AYON_WORKDIR`, which is related to 'IWorkfileHost', + would not be available in that case which might be break some + logic. + Args: folder_entity (Dict[str, Any]): Folder entity to set. task_entity (Dict[str, Any]): Task entity to set. - template_key (Union[str, None]): Prepared template key to be used for + template_key (Optional[str]): Prepared template key to be used for workfile template in Anatomy. + workdir (Optional[str]): Workdir to set. + anatomy (Optional[Anatomy]): Anatomy object used for workdir + calculation. + project_entity (Optional[dict[str, Any]]): Project entity used for + workdir calculation. + project_settings (Optional[dict[str, Any]]): Project settings used for + workdir calculation. Returns: Dict[str, str]: The changed key, values in the current Session. - """ - project_name = get_current_project_name() - workdir = None + """ + host = registered_host() + project_name = host.get_current_project_name() folder_path = None task_name = None if folder_entity: folder_path = folder_entity["path"] if task_entity: task_name = task_entity["name"] - project_entity = ayon_api.get_project(project_name) - host_name = get_current_host_name() + + if isinstance(host, IWorkfileHost) and workdir is None and folder_entity: + if project_entity is None: + project_entity = ayon_api.get_project(project_name) workdir = get_workdir( project_entity, folder_entity, task_entity, - host_name, - template_key=template_key + host.name, + anatomy=anatomy, + template_key=template_key, + project_settings=project_settings, ) envs = { diff --git a/client/ayon_core/pipeline/workfile/__init__.py b/client/ayon_core/pipeline/workfile/__init__.py index aa7e150bca..cc081d676b 100644 --- a/client/ayon_core/pipeline/workfile/__init__.py +++ b/client/ayon_core/pipeline/workfile/__init__.py @@ -4,6 +4,8 @@ get_workdir_with_workdir_data, get_workdir, + get_last_workfile_with_version_from_paths, + get_last_workfile_from_paths, get_last_workfile_with_version, get_last_workfile, @@ -11,12 +13,19 @@ get_custom_workfile_template_by_string_context, create_workdir_extra_folders, + + get_comments_from_workfile_paths, ) from .utils import ( should_use_last_workfile_on_launch, should_open_workfiles_tool_on_launch, MissingWorkdirError, + + open_workfile, + save_current_workfile_to, + copy_and_open_workfile, + copy_and_open_workfile_representation, ) from .build_workfile import BuildWorkfile @@ -37,6 +46,8 @@ "get_workdir_with_workdir_data", "get_workdir", + "get_last_workfile_with_version_from_paths", + "get_last_workfile_from_paths", "get_last_workfile_with_version", "get_last_workfile", @@ -45,10 +56,17 @@ "create_workdir_extra_folders", + "get_comments_from_workfile_paths", + "should_use_last_workfile_on_launch", "should_open_workfiles_tool_on_launch", "MissingWorkdirError", + "open_workfile", + "save_current_workfile_to", + "copy_and_open_workfile", + "copy_and_open_workfile_representation", + "BuildWorkfile", "discover_workfile_build_plugins", diff --git a/client/ayon_core/pipeline/workfile/path_resolving.py b/client/ayon_core/pipeline/workfile/path_resolving.py index 9b2fe25199..ac915060eb 100644 --- a/client/ayon_core/pipeline/workfile/path_resolving.py +++ b/client/ayon_core/pipeline/workfile/path_resolving.py @@ -1,3 +1,4 @@ +from __future__ import annotations import os import re import copy @@ -12,6 +13,7 @@ Logger, StringTemplate, ) +from ayon_core.lib.path_templates import TemplateResult from ayon_core.pipeline import version_start, Anatomy from ayon_core.pipeline.template_data import get_template_data @@ -111,7 +113,7 @@ def get_workdir_with_workdir_data( anatomy=None, template_key=None, project_settings=None -): +) -> TemplateResult: """Fill workdir path from entered data and project's anatomy. It is possible to pass only project's name instead of project's anatomy but @@ -131,8 +133,8 @@ def get_workdir_with_workdir_data( Returns: TemplateResult: Workdir path. - """ + """ if not anatomy: anatomy = Anatomy(project_name) @@ -174,8 +176,8 @@ def get_workdir( is stored under `AYON_HOST_NAME` key. anatomy (Anatomy): Optional argument. Anatomy object is created using project name from `project_entity`. It is preferred to pass this - argument as initialization of a new Anatomy object may be time - consuming. + argument as initialization of a new Anatomy object may be + time-consuming. template_key (str): Key of work templates in anatomy templates. Default value is defined in `get_workdir_with_workdir_data`. project_settings(Dict[str, Any]): Prepared project settings for @@ -207,12 +209,12 @@ def get_workdir( ) -def get_last_workfile_with_version( - workdir, file_template, fill_data, extensions +def get_last_workfile_with_version_from_paths( + filepaths, file_template, template_data, extensions ): """Return last workfile version. - Usign workfile template and it's filling data find most possible last + Using workfile template and it's filling data find most possible last version of workfile which was created for the context. Functionality is fully based on knowing which keys are optional or what @@ -222,50 +224,43 @@ def get_last_workfile_with_version( last workfile. Args: - workdir (str): Path to dir where workfiles are stored. + filepaths (list[str]): Workfile paths. file_template (str): Template of file name. - fill_data (Dict[str, Any]): Data for filling template. + template_data (Dict[str, Any]): Data for filling template. extensions (Iterable[str]): All allowed file extensions of workfile. Returns: - Tuple[Union[str, None], Union[int, None]]: Last workfile with version + tuple[Union[str, None], Union[int, None]]: Last workfile with version if there is any workfile otherwise None for both. - """ - if not os.path.exists(workdir): + """ + if not filepaths: return None, None dotted_extensions = set() for ext in extensions: if not ext.startswith("."): - ext = ".{}".format(ext) - dotted_extensions.add(ext) - - # Fast match on extension - filenames = [ - filename - for filename in os.listdir(workdir) - if os.path.splitext(filename)[-1] in dotted_extensions - ] + ext = f".{ext}" + dotted_extensions.add(re.escape(ext)) # Build template without optionals, version to digits only regex # and comment to any definable value. # Escape extensions dot for regex - regex_exts = [ - "\\" + ext - for ext in dotted_extensions - ] - ext_expression = "(?:" + "|".join(regex_exts) + ")" - - # Replace `.{ext}` with `{ext}` so we are sure there is not dot at the end - file_template = re.sub(r"\.?{ext}", ext_expression, file_template) - # Replace optional keys with optional content regex - file_template = re.sub(r"<.*?>", r".*?", file_template) - # Replace `{version}` with group regex - file_template = re.sub(r"{version.*?}", r"([0-9]+)", file_template) - file_template = re.sub(r"{comment.*?}", r".+?", file_template) + ext_expression = "(?:" + "|".join(dotted_extensions) + ")" + + for pattern, replacement in ( + # Replace `.{ext}` with `{ext}` so we are sure dot is not at the end + (r"\.?{ext}", ext_expression), + # Replace optional keys with optional content regex + (r"<.*?>", r".*?"), + # Replace `{version}` with group regex + (r"{version.*?}", r"([0-9]+)"), + (r"{comment.*?}", r".+?"), + ): + file_template = re.sub(pattern, replacement, file_template) + file_template = StringTemplate.format_strict_template( - file_template, fill_data + file_template, template_data ) # Match with ignore case on Windows due to the Windows @@ -278,42 +273,141 @@ def get_last_workfile_with_version( # Get highest version among existing matching files version = None - output_filenames = [] - for filename in sorted(filenames): + output_filepaths = [] + for filepath in sorted(filepaths): + filename = os.path.basename(filepath) match = re.match(file_template, filename, **kwargs) if not match: continue if not match.groups(): - output_filenames.append(filename) + output_filepaths.append(filename) continue file_version = int(match.group(1)) if version is None or file_version > version: - output_filenames[:] = [] + output_filepaths.clear() version = file_version if file_version == version: - output_filenames.append(filename) + output_filepaths.append(filepath) + + output_filepath = None + last_time = None + for _output_filepath in output_filepaths: + mod_time = None + if os.path.exists(_output_filepath): + mod_time = os.path.getmtime(_output_filepath) + if ( + last_time is None + or (mod_time is not None and last_time < mod_time) + ): + output_filepath = _output_filepath + last_time = mod_time + + return output_filepath, version + + +def get_last_workfile_from_paths( + filepaths: list[str], + file_template: str, + template_data: dict[str, Any], + extensions: set[str], +): + """Return last workfile filename. + + Returns file with version 1 if there is not workfile yet. + + Args: + filepaths (list[str]): Paths to workfiles. + file_template (str): Template of file name. + template_data (dict[str, Any]): Data for filling template. + extensions (set[str]): All allowed file extensions of workfile. + + Returns: + Optional[str]: Last or first workfile as filename of full path + to filename. + + """ + filepath, _version = get_last_workfile_with_version_from_paths( + filepaths, file_template, template_data, extensions + ) + return filepath + + +def _filter_dir_files_by_ext( + dirpath: str, + extensions: set[str], +): + """Filter files by extensions. + + Args: + dirpath (str): List of file paths. + extensions (set[str]): Set of file extensions. + + Returns: + tuple[list[str], set[str]]: Filtered list of file paths. + + """ + dotted_extensions = set() + for ext in extensions: + if not ext.startswith("."): + ext = f".{ext}" + dotted_extensions.add(ext) + filtered_paths = [ + os.path.join(dirpath, filename) + for filename in os.listdir(dirpath) + if os.path.splitext(filename)[-1] in dotted_extensions + ] + return filtered_paths, dotted_extensions + + +def get_last_workfile_with_version( + workdir, file_template, fill_data, extensions +): + """Return last workfile version. + + Usign workfile template and it's filling data find most possible last + version of workfile which was created for the context. + + Functionality is fully based on knowing which keys are optional or what + values are expected as value. - output_filename = None - if output_filenames: - if len(output_filenames) == 1: - output_filename = output_filenames[0] - else: - last_time = None - for _output_filename in output_filenames: - full_path = os.path.join(workdir, _output_filename) - mod_time = os.path.getmtime(full_path) - if last_time is None or last_time < mod_time: - output_filename = _output_filename - last_time = mod_time + The last modified file is used if more files can be considered as + last workfile. + + Args: + workdir (str): Path to dir where workfiles are stored. + file_template (str): Template of file name. + fill_data (Dict[str, Any]): Data for filling template. + extensions (Iterable[str]): All allowed file extensions of workfile. + + Returns: + Tuple[Union[str, None], Union[int, None]]: Last workfile with version + if there is any workfile otherwise None for both. + """ + + if not os.path.exists(workdir): + return None, None + + filepaths, dotted_extensions = _filter_dir_files_by_ext( + workdir, extensions + ) - return output_filename, version + return get_last_workfile_with_version_from_paths( + filepaths, + file_template, + fill_data, + dotted_extensions, + ) def get_last_workfile( - workdir, file_template, fill_data, extensions, full_path=False + workdir: str, + file_template: str, + fill_data: dict[str, Any], + extensions: set[str], + full_path: bool = False ): """Return last workfile filename. @@ -324,17 +418,23 @@ def get_last_workfile( file_template (str): Template of file name. fill_data (Dict[str, Any]): Data for filling template. extensions (Iterable[str]): All allowed file extensions of workfile. - full_path (Optional[bool]): Full path to file is returned if + full_path (bool): Full path to file is returned if set to True. Returns: str: Last or first workfile as filename of full path to filename. """ - filename, _version = get_last_workfile_with_version( - workdir, file_template, fill_data, extensions + filepaths, dotted_extensions = _filter_dir_files_by_ext( + workdir, extensions + ) + filepath = get_last_workfile_from_paths( + filepaths, + file_template, + fill_data, + dotted_extensions ) - if filename is None: + if filepath is None: data = copy.deepcopy(fill_data) data["version"] = version_start.get_versioning_start( data["project"]["name"], @@ -348,11 +448,11 @@ def get_last_workfile( data["ext"] = extensions[0] data["ext"] = data["ext"].lstrip(".") filename = StringTemplate.format_strict_template(file_template, data) + filepath = os.path.join(workdir, filename) if full_path: - return os.path.normpath(os.path.join(workdir, filename)) - - return filename + return os.path.normpath(filepath) + return os.path.basename(filepath) def get_custom_workfile_template( @@ -562,3 +662,100 @@ def create_workdir_extra_folders( fullpath = os.path.join(workdir, subfolder) if not os.path.exists(fullpath): os.makedirs(fullpath) + + +class CommentMatcher: + """Use anatomy and work file data to parse comments from filenames. + + Args: + extensions (set[str]): Set of extensions. + file_template (StringTemplate): Workfile file template. + data (dict[str, Any]): Data to fill the template with. + + """ + def __init__( + self, + extensions: set[str], + file_template: StringTemplate, + data: dict[str, Any] + ): + self._fname_regex = None + + if "{comment}" not in file_template: + # Don't look for comment if template doesn't allow it + return + + # Create a regex group for extensions + any_extension = "(?:{})".format( + "|".join(re.escape(ext.lstrip(".")) for ext in extensions) + ) + + # Use placeholders that will never be in the filename + temp_data = copy.deepcopy(data) + temp_data["comment"] = "<>" + temp_data["version"] = "<>" + temp_data["ext"] = "<>" + + fname_pattern = re.escape( + file_template.format_strict(temp_data) + ) + + # Replace comment and version with something we can match with regex + replacements = ( + ("<>", r"(?P.+)"), + ("<>", r"[0-9]+"), + ("<>", any_extension), + ) + for src, dest in replacements: + fname_pattern = fname_pattern.replace(re.escape(src), dest) + + # Match from beginning to end of string to be safe + self._fname_regex = re.compile(f"^{fname_pattern}$") + + def parse_comment(self, filename: str) -> Optional[str]: + """Parse the {comment} part from a filename""" + if self._fname_regex: + match = self._fname_regex.match(filename) + if match: + return match.group("comment") + return None + + +def get_comments_from_workfile_paths( + filepaths: list[str], + extensions: set[str], + file_template: StringTemplate, + template_data: dict[str, Any], + current_filename: Optional[str] = None, +) -> tuple[list[str], str]: + """Collect comments from workfile filenames. + + Based on 'current_filename' is also returned "current comment". + + Args: + filepaths (list[str]): List of filepaths to parse. + extensions (set[str]): Set of file extensions. + file_template (StringTemplate): Workfile file template. + template_data (dict[str, Any]): Data to fill the template with. + current_filename (str): Filename to check for current comment. + + Returns: + tuple[list[str], str]: List of comments and the current comment. + + """ + current_comment = "" + if not filepaths: + return [], current_comment + + matcher = CommentMatcher(extensions, file_template, template_data) + + comment_hints = set() + for filepath in filepaths: + filename = os.path.basename(filepath) + comment = matcher.parse_comment(filename) + if comment: + comment_hints.add(comment) + if filename == current_filename: + current_comment = comment + + return list(comment_hints), current_comment diff --git a/client/ayon_core/pipeline/workfile/utils.py b/client/ayon_core/pipeline/workfile/utils.py index 25be061dec..a7a1436522 100644 --- a/client/ayon_core/pipeline/workfile/utils.py +++ b/client/ayon_core/pipeline/workfile/utils.py @@ -1,6 +1,24 @@ -from ayon_core.lib import filter_profiles +from __future__ import annotations +import os +import platform +import uuid +import typing +from typing import Optional, Any + +import ayon_api +from ayon_api.operations import OperationsSession + +from ayon_core.lib import filter_profiles, emit_event, get_ayon_username from ayon_core.settings import get_project_settings +from .path_resolving import ( + create_workdir_extra_folders, + get_workfile_template_key, +) + +if typing.TYPE_CHECKING: + from ayon_core.pipeline import Anatomy + class MissingWorkdirError(Exception): """Raised when accessing a work directory not found on disk.""" @@ -124,3 +142,625 @@ def should_open_workfiles_tool_on_launch( if output is None: return default_output return output + + +def _get_event_context_data( + project_name: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + host_name: str, +): + return { + "project_name": project_name, + "folder_id": folder_entity["id"], + "folder_path": folder_entity["path"], + "task_id": task_entity["id"], + "task_name": task_entity["name"], + "host_name": host_name, + } + + +def save_workfile_info( + project_name: str, + task_id: str, + rootless_path: str, + host_name: str, + version: Optional[int], + comment: Optional[str], + description: Optional[str], + username: Optional[str] = None, + workfile_entities: Optional[list[dict[str, Any]]] = None, +): + # TODO create pipeline function for this + if workfile_entities is None: + workfile_entities = list(ayon_api.get_workfiles_info( + project_name, + task_ids=[task_id], + )) + + workfile_entity = next( + ( + _ent + for _ent in workfile_entities + if _ent["path"] == rootless_path + ), + None + ) + + if username is None: + username = get_ayon_username() + + if not workfile_entity: + return _create_workfile_info_entity( + project_name, + task_id, + host_name, + rootless_path, + username, + version, + comment, + description, + ) + + data = {} + for key, value in ( + ("host_name", host_name), + ("version", version), + ("comment", comment), + ): + if value is not None: + data[key] = value + + old_data = workfile_entity["data"] + + changed_data = {} + for key, value in data.items(): + if key not in old_data or old_data[key] != value: + changed_data[key] = value + + update_data = {} + if changed_data: + update_data["data"] = changed_data + + old_description = workfile_entity["attrib"].get("description") + if description is not None and old_description != description: + update_data["attrib"] = {"description": description} + workfile_entity["attrib"]["description"] = description + + # Automatically fix 'createdBy' and 'updatedBy' fields + # NOTE both fields were not automatically filled by server + # until 1.1.3 release. + if workfile_entity.get("createdBy") is None: + update_data["createdBy"] = username + workfile_entity["createdBy"] = username + + if workfile_entity.get("updatedBy") != username: + update_data["updatedBy"] = username + workfile_entity["updatedBy"] = username + + if not update_data: + return + + session = OperationsSession() + session.update_entity( + project_name, + "workfile", + workfile_entity["id"], + update_data, + ) + session.commit() + return workfile_entity + + +def open_workfile( + filepath: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], +): + from ayon_core.pipeline.context_tools import ( + registered_host, change_current_context + ) + + # Trigger before save event + host = registered_host() + context = host.get_current_context() + project_name = context["project_name"] + current_folder_path = context["folder_path"] + current_task_name = context["task_name"] + host_name = host.name + + # TODO move to workfiles pipeline + event_data = _get_event_context_data( + project_name, folder_entity, task_entity, host_name + ) + event_data["filepath"] = filepath + + emit_event("workfile.open.before", event_data, source="workfiles.tool") + + # Change context + if ( + folder_entity["path"] != current_folder_path + or task_entity["name"] != current_task_name + ): + change_current_context( + project_name, + folder_entity, + task_entity, + workdir=os.path.dirname(filepath) + ) + + host.open_workfile_with_context( + filepath, + folder_entity, + task_entity, + ) + + emit_event("workfile.open.after", event_data, source="workfiles.tool") + + +def save_current_workfile_to( + workfile_path: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + version: Optional[int], + comment: Optional[str] = None, + description: Optional[str] = None, + source: Optional[str] = None, + rootless_path: Optional[str] = None, + workfile_entities: Optional[list[dict[str, Any]]] = None, + username: Optional[str] = None, + project_entity: Optional[dict[str, Any]] = None, + project_settings: Optional[dict[str, Any]] = None, + anatomy: Optional["Anatomy"] = None, +) -> dict[str, Any]: + """Save current workfile to new location or context. + + Args: + workfile_path (str): Destination workfile path. + folder_entity (dict[str, Any]): Target folder entity. + task_entity (dict[str, Any]): Target task entity. + version (Optional[int]): Workfile version. + comment (optional[str]): Workfile comment. + description (Optional[str]): Workfile description. + source (Optional[str]): Source of the save action. + rootless_path (Optional[str]): Rootless path of the workfile. Is + calculated if not passed in. + workfile_entities (Optional[list[dict[str, Any]]]): List of workfile + username (Optional[str]): Username of the user saving the workfile. + Current user is used if not passed. + project_entity (Optional[dict[str, Any]]): Project entity used for + rootless path calculation. + project_settings (Optional[dict[str, Any]]): Project settings used for + rootless path calculation. + anatomy (Optional[Anatomy]): Project anatomy used for rootless + path calculation. + + Returns: + dict[str, Any]: Workfile info entity. + + """ + print("save_current_workfile_to") + return _save_workfile( + None, + None, + None, + None, + workfile_path, + folder_entity, + task_entity, + version, + comment, + description, + source, + rootless_path, + workfile_entities, + username, + project_entity, + project_settings, + anatomy, + ) + + +def copy_and_open_workfile( + src_workfile_path: str, + workfile_path: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + version: Optional[int], + comment: Optional[str] = None, + description: Optional[str] = None, + source: Optional[str] = None, + rootless_path: Optional[str] = None, + workfile_entities: Optional[list[dict[str, Any]]] = None, + username: Optional[str] = None, + project_entity: Optional[dict[str, Any]] = None, + project_settings: Optional[dict[str, Any]] = None, + anatomy: Optional["Anatomy"] = None, +) -> dict[str, Any]: + """Copy workfile to new location and open it. + + Args: + src_workfile_path (str): Source workfile path. + workfile_path (str): Destination workfile path. + folder_entity (dict[str, Any]): Target folder entity. + task_entity (dict[str, Any]): Target task entity. + version (Optional[int]): Workfile version. + comment (optional[str]): Workfile comment. + description (Optional[str]): Workfile description. + source (Optional[str]): Source of the save action. + rootless_path (Optional[str]): Rootless path of the workfile. Is + calculated if not passed in. + workfile_entities (Optional[list[dict[str, Any]]]): List of workfile + username (Optional[str]): Username of the user saving the workfile. + Current user is used if not passed. + project_entity (Optional[dict[str, Any]]): Project entity used for + rootless path calculation. + project_settings (Optional[dict[str, Any]]): Project settings used for + rootless path calculation. + anatomy (Optional[Anatomy]): Project anatomy used for rootless + path calculation. + + Returns: + dict[str, Any]: Workfile info entity. + + """ + print("copy_and_open_workfile") + return _save_workfile( + src_workfile_path, + None, + None, + None, + workfile_path, + folder_entity, + task_entity, + version, + comment, + description, + source, + rootless_path, + workfile_entities, + username, + project_entity, + project_settings, + anatomy, + ) + + +def copy_and_open_workfile_representation( + project_name: str, + representation_id: str, + workfile_path: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + version: Optional[int], + comment: Optional[str] = None, + description: Optional[str] = None, + source: Optional[str] = None, + rootless_path: Optional[str] = None, + representation_entity: Optional[dict[str, Any]] = None, + representation_path: Optional[str] = None, + workfile_entities: Optional[list[dict[str, Any]]] = None, + username: Optional[str] = None, + project_entity: Optional[dict[str, Any]] = None, + project_settings: Optional[dict[str, Any]] = None, + anatomy: Optional["Anatomy"] = None, +) -> dict[str, Any]: + """Copy workfile to new location and open it. + + Args: + project_name (str): Project name where representation is stored. + representation_id (str): Source representation id. + workfile_path (str): Destination workfile path. + folder_entity (dict[str, Any]): Target folder entity. + task_entity (dict[str, Any]): Target task entity. + version (Optional[int]): Workfile version. + comment (optional[str]): Workfile comment. + description (Optional[str]): Workfile description. + source (Optional[str]): Source of the save action. + rootless_path (Optional[str]): Rootless path of the workfile. Is + calculated if not passed in. + workfile_entities (Optional[list[dict[str, Any]]]): List of workfile + username (Optional[str]): Username of the user saving the workfile. + Current user is used if not passed. + project_entity (Optional[dict[str, Any]]): Project entity used for + rootless path calculation. + project_settings (Optional[dict[str, Any]]): Project settings used for + rootless path calculation. + anatomy (Optional[Anatomy]): Project anatomy used for rootless + path calculation. + + Returns: + dict[str, Any]: Workfile info entity. + + """ + print("copy_and_open_workfile_representation") + if representation_entity is None: + representation_entity = ayon_api.get_representation_by_id( + project_name, + representation_id, + ) + + return _save_workfile( + None, + project_name, + representation_entity, + representation_path, + workfile_path, + folder_entity, + task_entity, + version, + comment, + description, + source, + rootless_path, + workfile_entities, + username, + project_entity, + project_settings, + anatomy, + ) + + +def _save_workfile( + src_workfile_path: Optional[str], + representation_project_name: Optional[str], + representation_entity: Optional[dict[str, Any]], + representation_path: Optional[str], + workfile_path: str, + folder_entity: dict[str, Any], + task_entity: dict[str, Any], + version: Optional[int], + comment: Optional[str], + description: Optional[str], + source: Optional[str], + rootless_path: Optional[str], + workfile_entities: Optional[list[dict[str, Any]]], + username: Optional[str], + project_entity: Optional[dict[str, Any]], + project_settings: Optional[dict[str, Any]], + anatomy: Optional["Anatomy"], +) -> dict[str, Any]: + """Function used to save workfile to new location and context. + + Because the functionality for 'save_current_workfile_to' and + 'copy_and_open_workfile' is currently the same, except for used + function on host it is easier to create this wrapper function. + + Args: + src_workfile_path (Optional[str]): Source workfile path. + representation_entity (Optional[dict[str, Any]]): Representation used + as source for workfile. + workfile_path (str): Destination workfile path. + folder_entity (dict[str, Any]): Target folder entity. + task_entity (dict[str, Any]): Target task entity. + version (Optional[int]): Workfile version. + comment (optional[str]): Workfile comment. + description (Optional[str]): Workfile description. + source (Optional[str]): Source of the save action. + rootless_path (Optional[str]): Rootless path of the workfile. Is + calculated if not passed in. + workfile_entities (Optional[list[dict[str, Any]]]): List of workfile + username (Optional[str]): Username of the user saving the workfile. + Current user is used if not passed. + project_entity (Optional[dict[str, Any]]): Project entity used for + rootless path calculation. + project_settings (Optional[dict[str, Any]]): Project settings used for + rootless path calculation. + anatomy (Optional[Anatomy]): Project anatomy used for rootless + path calculation. + + Returns: + dict[str, Any]: Workfile info entity. + + """ + from ayon_core.pipeline.context_tools import ( + registered_host, change_current_context + ) + + # Trigger before save event + host = registered_host() + context = host.get_current_context() + project_name = context["project_name"] + current_folder_path = context["folder_path"] + current_task_name = context["task_name"] + + task_name = task_entity["name"] + task_type = task_entity["taskType"] + task_id = task_entity["id"] + host_name = host.name + + workdir, filename = os.path.split(workfile_path) + + # QUESTION should the data be different for 'before' and 'after'? + event_data = _get_event_context_data( + project_name, folder_entity, task_entity, host_name + ) + event_data.update({ + "filename": filename, + "workdir_path": workdir, + }) + + emit_event("workfile.save.before", event_data, source=source) + + # Change context + if ( + folder_entity["path"] != current_folder_path + or task_entity["name"] != current_task_name + ): + change_current_context( + folder_entity, + task_entity, + workdir=workdir, + anatomy=anatomy, + project_entity=project_entity, + project_settings=project_settings, + ) + + if src_workfile_path: + host.copy_workfile( + src_workfile_path, + workfile_path, + folder_entity, + task_entity, + open_workfile=True, + ) + elif representation_entity: + host.copy_workfile_representation( + representation_project_name, + representation_entity, + workfile_path, + folder_entity, + task_entity, + open_workfile=True, + src_representation_path=representation_path, + anatomy=anatomy, + ) + else: + host.save_workfile_with_context( + workfile_path, + folder_entity, + task_entity, + open_workfile=True, + ) + + if not description: + description = None + + if not comment: + comment = None + + if rootless_path is None: + rootless_path = _find_rootless_path( + workfile_path, + project_name, + task_type, + host_name, + project_entity, + project_settings, + anatomy, + ) + + # It is not possible to create workfile infor without rootless path + workfile_info = None + if rootless_path: + if platform.system().lower() == "windows": + rootless_path = rootless_path.replace("\\", "/") + + workfile_info = save_workfile_info( + project_name, + task_id, + rootless_path, + host_name, + version, + comment, + description, + username=username, + workfile_entities=workfile_entities, + ) + + # Create extra folders + create_workdir_extra_folders( + workdir, + host.name, + task_entity["taskType"], + task_name, + project_name + ) + + # Trigger after save events + emit_event("workfile.save.after", event_data, source=source) + return workfile_info + + +def _find_rootless_path( + workfile_path: str, + project_name: str, + task_type: str, + host_name: str, + project_entity: Optional[dict[str, Any]] = None, + project_settings: Optional[dict[str, Any]] = None, + anatomy: Optional["Anatomy"] = None, +) -> str: + """Find rootless workfile path.""" + if anatomy is None: + from ayon_core.pipeline import Anatomy + + anatomy = Anatomy(project_name, project_entity=project_entity) + template_key = get_workfile_template_key( + project_name, + task_type, + host_name, + project_settings=project_settings + ) + dir_template = anatomy.get_template_item( + "work", template_key, "directory" + ) + result = dir_template.format({"root": anatomy.roots}) + used_root = result.used_values.get("root") + rootless_path = str(workfile_path) + if platform.system().lower() == "windows": + rootless_path = rootless_path.replace("\\", "/") + + root_key = root_value = None + if used_root is not None: + root_key, root_value = next(iter(used_root.items())) + if platform.system().lower() == "windows": + root_value = root_value.replace("\\", "/") + + if root_value and rootless_path.startswith(root_value): + rootless_path = rootless_path[len(root_value):].lstrip("/") + rootless_path = f"{{root[{root_key}]}}/{rootless_path}" + else: + success, result = anatomy.find_root_template_from_path(rootless_path) + if success: + rootless_path = result + return rootless_path + + +def _create_workfile_info_entity( + project_name: str, + task_id: str, + host_name: str, + rootless_path: str, + username: str, + version: Optional[int], + comment: Optional[str], + description: Optional[str], +) -> dict[str, Any]: + extension = os.path.splitext(rootless_path)[1] + + attrib = {} + for key, value in ( + ("extension", extension), + ("description", description), + ): + if value is not None: + attrib[key] = value + + data = {} + for key, value in ( + ("host_name", host_name), + ("version", version), + ("comment", comment), + ): + if value is not None: + data[key] = value + + workfile_info = { + "id": uuid.uuid4().hex, + "path": rootless_path, + "taskId": task_id, + "attrib": attrib, + "data": data, + # TODO remove 'createdBy' and 'updatedBy' fields when server is + # or above 1.1.3 . + "createdBy": username, + "updatedBy": username, + } + + session = OperationsSession() + session.create_entity( + project_name, "workfile", workfile_info + ) + session.commit() + return workfile_info diff --git a/client/ayon_core/tools/workfiles/abstract.py b/client/ayon_core/tools/workfiles/abstract.py index 152ca33d99..863d6bb9bc 100644 --- a/client/ayon_core/tools/workfiles/abstract.py +++ b/client/ayon_core/tools/workfiles/abstract.py @@ -4,76 +4,6 @@ from ayon_core.style import get_default_entity_icon_color -class WorkfileInfo: - """Information about workarea file with possible additional from database. - - Args: - folder_id (str): Folder id. - task_id (str): Task id. - filepath (str): Filepath. - filesize (int): File size. - creation_time (float): Creation time (timestamp). - modification_time (float): Modification time (timestamp). - created_by (Union[str, none]): User who created the file. - updated_by (Union[str, none]): User who last updated the file. - note (str): Note. - """ - - def __init__( - self, - folder_id, - task_id, - filepath, - filesize, - creation_time, - modification_time, - created_by, - updated_by, - note, - ): - self.folder_id = folder_id - self.task_id = task_id - self.filepath = filepath - self.filesize = filesize - self.creation_time = creation_time - self.modification_time = modification_time - self.created_by = created_by - self.updated_by = updated_by - self.note = note - - def to_data(self): - """Converts WorkfileInfo item to data. - - Returns: - dict[str, Any]: Folder item data. - """ - - return { - "folder_id": self.folder_id, - "task_id": self.task_id, - "filepath": self.filepath, - "filesize": self.filesize, - "creation_time": self.creation_time, - "modification_time": self.modification_time, - "created_by": self.created_by, - "updated_by": self.updated_by, - "note": self.note, - } - - @classmethod - def from_data(cls, data): - """Re-creates WorkfileInfo item from data. - - Args: - data (dict[str, Any]): Workfile info item data. - - Returns: - WorkfileInfo: Workfile info item. - """ - - return cls(**data) - - class FolderItem: """Item representing folder entity on a server. @@ -87,8 +17,8 @@ class FolderItem: label (str): Folder label. icon_name (str): Name of icon from font awesome. icon_color (str): Hex color string that will be used for icon. - """ + """ def __init__( self, entity_id, parent_id, name, label, icon_name, icon_color ): @@ -104,8 +34,8 @@ def to_data(self): Returns: dict[str, Any]: Folder item data. - """ + """ return { "entity_id": self.entity_id, "parent_id": self.parent_id, @@ -124,8 +54,8 @@ def from_data(cls, data): Returns: FolderItem: Folder item. - """ + """ return cls(**data) @@ -144,8 +74,8 @@ class TaskItem: parent_id (str): Parent folder id. icon_name (str): Name of icon from font awesome. icon_color (str): Hex color string that will be used for icon. - """ + """ def __init__( self, task_id, name, task_type, parent_id, icon_name, icon_color ): @@ -163,8 +93,8 @@ def id(self): Returns: str: Task id. - """ + """ return self.task_id @property @@ -173,8 +103,8 @@ def label(self): Returns: str: Label of task item. - """ + """ if self._label is None: self._label = "{} ({})".format(self.name, self.task_type) return self._label @@ -184,8 +114,8 @@ def to_data(self): Returns: dict[str, Any]: Task item data. - """ + """ return { "task_id": self.task_id, "name": self.name, @@ -204,114 +134,9 @@ def from_data(cls, data): Returns: TaskItem: Task item. - """ - return cls(**data) - - -class FileItem: - """File item that represents a file. - - Can be used for both Workarea and Published workfile. Workarea file - will always exist on disk which is not the case for Published workfile. - - Args: - dirpath (str): Directory path of file. - filename (str): Filename. - modified (float): Modified timestamp. - created_by (Optional[str]): Username. - representation_id (Optional[str]): Representation id of published - workfile. - filepath (Optional[str]): Prepared filepath. - exists (Optional[bool]): If file exists on disk. - """ - - def __init__( - self, - dirpath, - filename, - modified, - created_by=None, - updated_by=None, - representation_id=None, - filepath=None, - exists=None - ): - self.filename = filename - self.dirpath = dirpath - self.modified = modified - self.created_by = created_by - self.updated_by = updated_by - self.representation_id = representation_id - self._filepath = filepath - self._exists = exists - - @property - def filepath(self): - """Filepath of file. - - Returns: - str: Full path to a file. - """ - - if self._filepath is None: - self._filepath = os.path.join(self.dirpath, self.filename) - return self._filepath - - @property - def exists(self): - """File is available. - - Returns: - bool: If file exists on disk. - """ - - if self._exists is None: - self._exists = os.path.exists(self.filepath) - return self._exists - - def to_data(self): - """Converts file item to data. - - Returns: - dict[str, Any]: File item data. """ - - return { - "filename": self.filename, - "dirpath": self.dirpath, - "modified": self.modified, - "created_by": self.created_by, - "representation_id": self.representation_id, - "filepath": self.filepath, - "exists": self.exists, - } - - @classmethod - def from_data(cls, data): - """Re-creates file item from data. - - Args: - data (dict[str, Any]): File item data. - - Returns: - FileItem: File item. - """ - - required_keys = { - "filename", - "dirpath", - "modified", - "representation_id" - } - missing_keys = required_keys - set(data.keys()) - if missing_keys: - raise KeyError("Missing keys: {}".format(missing_keys)) - - return cls(**{ - key: data[key] - for key in required_keys - }) + return cls(**data) class WorkareaFilepathResult: @@ -323,8 +148,8 @@ class WorkareaFilepathResult: exists (bool): True if file exists. filepath (str): Filepath. If not provided it will be constructed from root and filename. - """ + """ def __init__(self, root, filename, exists, filepath=None): if not filepath and root and filename: filepath = os.path.join(root, filename) @@ -341,8 +166,8 @@ def is_host_valid(self): Returns: bool: True if host is valid. - """ + """ pass @abstractmethod @@ -353,8 +178,8 @@ def get_workfile_extensions(self): Returns: Iterable[str]: List of extensions. - """ + """ pass @abstractmethod @@ -363,8 +188,8 @@ def is_save_enabled(self): Returns: bool: True if save is enabled. - """ + """ pass @abstractmethod @@ -373,8 +198,8 @@ def set_save_enabled(self, enabled): Args: enabled (bool): Enable save workfile when True. - """ + """ pass @@ -386,6 +211,7 @@ def get_host_name(self): Returns: str: Name of host. + """ pass @@ -395,8 +221,8 @@ def get_current_project_name(self): Returns: str: Name of project. - """ + """ pass @abstractmethod @@ -406,8 +232,8 @@ def get_current_folder_id(self): Returns: Union[str, None]: Folder id or None if host does not have any context. - """ + """ pass @abstractmethod @@ -417,8 +243,8 @@ def get_current_task_name(self): Returns: Union[str, None]: Task name or None if host does not have any context. - """ + """ pass @abstractmethod @@ -428,8 +254,8 @@ def get_current_workfile(self): Returns: Union[str, None]: Path to workfile or None if host does not have opened specific file. - """ + """ pass @property @@ -439,8 +265,8 @@ def project_anatomy(self): Returns: Anatomy: Project anatomy. - """ + """ pass @property @@ -450,8 +276,8 @@ def project_settings(self): Returns: dict[str, Any]: Project settings. - """ + """ pass @abstractmethod @@ -463,8 +289,8 @@ def get_project_entity(self, project_name): Returns: dict[str, Any]: Project entity data. - """ + """ pass @abstractmethod @@ -477,8 +303,8 @@ def get_folder_entity(self, project_name, folder_id): Returns: dict[str, Any]: Folder entity data. - """ + """ pass @abstractmethod @@ -491,10 +317,24 @@ def get_task_entity(self, project_name, task_id): Returns: dict[str, Any]: Task entity data. + """ + pass + + @abstractmethod + def get_workfile_entities(self, task_id: str): + """Workfile entities for given task. + Args: + task_id (str): Task id. + + Returns: + list[dict[str, Any]]: List of workfile entities. + + """ pass + @abstractmethod def emit_event(self, topic, data=None, source=None): """Emit event. @@ -502,8 +342,8 @@ def emit_event(self, topic, data=None, source=None): topic (str): Event topic used for callbacks filtering. data (Optional[dict[str, Any]]): Event data. source (Optional[str]): Event source. - """ + """ pass @@ -530,8 +370,8 @@ def register_event_callback(self, topic, callback): topic (str): Name of topic. callback (Callable): Callback that will be called when event is triggered. - """ + """ pass @abstractmethod @@ -592,8 +432,8 @@ def get_workfile_extensions(self): Returns: List[str]: File extensions that can be used as workfile for current host. - """ + """ pass # Selection information @@ -603,8 +443,8 @@ def get_selected_folder_id(self): Returns: Union[str, None]: Folder id or None if no folder is selected. - """ + """ pass @abstractmethod @@ -616,8 +456,8 @@ def set_selected_folder(self, folder_id): Args: folder_id (Union[str, None]): Folder id or None if no folder is selected. - """ + """ pass @abstractmethod @@ -626,8 +466,8 @@ def get_selected_task_id(self): Returns: Union[str, None]: Task id or None if no folder is selected. - """ + """ pass @abstractmethod @@ -649,8 +489,8 @@ def set_selected_task(self, task_id, task_name): is selected. task_name (Union[str, None]): Task name or None if no task is selected. - """ + """ pass @abstractmethod @@ -659,18 +499,22 @@ def get_selected_workfile_path(self): Returns: Union[str, None]: Selected workfile path. - """ + """ pass @abstractmethod - def set_selected_workfile_path(self, path): + def set_selected_workfile_path( + self, rootless_path, path, workfile_entity_id + ): """Change selected workfile path. Args: + rootless_path (Union[str, None]): Selected workfile rootless path. path (Union[str, None]): Selected workfile path. - """ + workfile_entity_id (Union[str, None]): Workfile entity id. + """ pass @abstractmethod @@ -680,8 +524,8 @@ def get_selected_representation_id(self): Returns: Union[str, None]: Representation id or None if no representation is selected. - """ + """ pass @abstractmethod @@ -691,8 +535,8 @@ def set_selected_representation_id(self, representation_id): Args: representation_id (Union[str, None]): Selected workfile representation id. - """ + """ pass def get_selected_context(self): @@ -700,8 +544,8 @@ def get_selected_context(self): Returns: dict[str, Union[str, None]]: Selected context. - """ + """ return { "folder_id": self.get_selected_folder_id(), "task_id": self.get_selected_task_id(), @@ -737,8 +581,8 @@ def set_expected_selection( files UI element. representation_id (Optional[str]): Representation id. Used for published filed UI element. - """ + """ pass @abstractmethod @@ -750,8 +594,8 @@ def get_expected_selection_data(self): Returns: dict[str, Any]: Expected selection data. - """ + """ pass @abstractmethod @@ -760,8 +604,8 @@ def expected_folder_selected(self, folder_id): Args: folder_id (str): Folder id which was selected. - """ + """ pass @abstractmethod @@ -771,8 +615,8 @@ def expected_task_selected(self, folder_id, task_name): Args: folder_id (str): Folder id under which task is. task_name (str): Task name which was selected. - """ + """ pass @abstractmethod @@ -785,8 +629,8 @@ def expected_representation_selected( folder_id (str): Folder id under which representation is. task_name (str): Task name under which representation is. representation_id (str): Representation id which was selected. - """ + """ pass @abstractmethod @@ -797,8 +641,8 @@ def expected_workfile_selected(self, folder_id, task_name, workfile_name): folder_id (str): Folder id under which workfile is. task_name (str): Task name under which workfile is. workfile_name (str): Workfile filename which was selected. - """ + """ pass @abstractmethod @@ -823,8 +667,8 @@ def get_folder_items(self, project_name, sender): Returns: list[FolderItem]: Minimum possible information needed for visualisation of folder hierarchy. - """ + """ pass @abstractmethod @@ -843,8 +687,8 @@ def get_task_items(self, project_name, folder_id, sender): Returns: list[TaskItem]: Minimum possible information needed for visualisation of tasks. - """ + """ pass @abstractmethod @@ -853,8 +697,8 @@ def has_unsaved_changes(self): Returns: bool: Has unsaved changes. - """ + """ pass @abstractmethod @@ -867,8 +711,8 @@ def get_workarea_dir_by_context(self, folder_id, task_id): Returns: str: Workarea directory. - """ + """ pass @abstractmethod @@ -881,9 +725,9 @@ def get_workarea_file_items(self, folder_id, task_name, sender=None): sender (Optional[str]): Who requested workarea file items. Returns: - list[FileItem]: List of workarea file items. - """ + list[WorkfileInfo]: List of workarea file items. + """ pass @abstractmethod @@ -899,8 +743,8 @@ def get_workarea_save_as_data(self, folder_id, task_id): Returns: dict[str, Any]: Data for Save As operation. - """ + """ pass @abstractmethod @@ -925,12 +769,12 @@ def fill_workarea_filepath( Returns: WorkareaFilepathResult: Result of the operation. - """ + """ pass @abstractmethod - def get_published_file_items(self, folder_id, task_id): + def get_published_file_items(self, folder_id: str, task_id: str): """Get published file items. Args: @@ -938,44 +782,52 @@ def get_published_file_items(self, folder_id, task_id): task_id (Union[str, None]): Task id. Returns: - list[FileItem]: List of published file items. - """ + list[PublishedWorkfileInfo]: List of published file items. + """ pass @abstractmethod - def get_workfile_info(self, folder_id, task_name, filepath): + def get_workfile_info(self, folder_id, task_id, rootless_path): """Workfile info from database. Args: folder_id (str): Folder id. - task_name (str): Task id. - filepath (str): Workfile path. + task_id (str): Task id. + rootless_path (str): Workfile path. Returns: - Union[WorkfileInfo, None]: Workfile info or None if was passed + Optional[WorkfileInfo]: Workfile info or None if was passed invalid context. - """ + """ pass @abstractmethod - def save_workfile_info(self, folder_id, task_name, filepath, note): + def save_workfile_info( + self, + task_id, + rootless_path, + version=None, + comment=None, + description=None, + ): """Save workfile info to database. At this moment the only information which can be saved about - workfile is 'note'. + workfile is 'description'. - When 'note' is 'None' it is only validated if workfile info exists, - and if not then creates one with empty note. + If value of 'version', 'comment' or 'description' is 'None' it is not + added/updated to entity. Args: - folder_id (str): Folder id. - task_name (str): Task id. - filepath (str): Workfile path. - note (Union[str, None]): Note. - """ + task_id (str): Task id. + rootless_path (str): Rootless workfile path. + version (Optional[int]): Version of workfile. + comment (Optional[str]): User's comment (subversion). + description (Optional[str]): Workfile description. + """ pass # General commands @@ -985,8 +837,8 @@ def reset(self): Triggers 'controller.reset.started' event at the beginning and 'controller.reset.finished' at the end. - """ + """ pass # Controller actions @@ -998,8 +850,8 @@ def open_workfile(self, folder_id, task_id, filepath): folder_id (str): Folder id. task_id (str): Task id. filepath (str): Workfile path. - """ + """ pass @abstractmethod @@ -1013,22 +865,27 @@ def save_as_workfile( self, folder_id, task_id, + rootless_workdir, workdir, filename, - template_key, - artist_note, + version, + comment, + description, ): """Save current state of workfile to workarea. Args: folder_id (str): Folder id. task_id (str): Task id. - workdir (str): Workarea directory. + rootless_workdir (str): Workarea directory. filename (str): Workarea filename. template_key (str): Template key used to get the workdir and filename. - """ + version (Optional[int]): Version of workfile. + comment (Optional[str]): User's comment (subversion). + description (Optional[str]): Workfile description. + """ pass @abstractmethod @@ -1040,8 +897,10 @@ def copy_workfile_representation( task_id, workdir, filename, - template_key, - artist_note, + rootless_workdir, + version, + comment, + description, ): """Action to copy published workfile representation to workarea. @@ -1055,23 +914,40 @@ def copy_workfile_representation( task_id (str): Task id. workdir (str): Workarea directory. filename (str): Workarea filename. - template_key (str): Template key. - artist_note (str): Artist note. - """ + rootless_workdir (str): Rootless workdir. + version (int): Workfile version. + comment (str): User's comment (subversion). + description (str): Description note. + """ pass @abstractmethod - def duplicate_workfile(self, src_filepath, workdir, filename, artist_note): + def duplicate_workfile( + self, + folder_id, + task_id, + src_filepath, + rootless_workdir, + workdir, + filename, + description, + version, + comment + ): """Duplicate workfile. Workfiles is not opened when done. Args: + folder_id (str): Folder id. + task_id (str): Task id. src_filepath (str): Source workfile path. + rootless_workdir (str): Rootless workdir. workdir (str): Destination workdir. filename (str): Destination filename. - artist_note (str): Artist note. + version (int): Workfile version. + comment (str): User's comment (subversion). + description (str): Workfile description. """ - pass diff --git a/client/ayon_core/tools/workfiles/control.py b/client/ayon_core/tools/workfiles/control.py index 3a7459da0c..ab6b12e4f4 100644 --- a/client/ayon_core/tools/workfiles/control.py +++ b/client/ayon_core/tools/workfiles/control.py @@ -1,19 +1,13 @@ import os -import shutil import ayon_api from ayon_core.host import IWorkfileHost -from ayon_core.lib import Logger, emit_event +from ayon_core.lib import Logger from ayon_core.lib.events import QueuedEventSystem from ayon_core.settings import get_project_settings from ayon_core.pipeline import Anatomy, registered_host -from ayon_core.pipeline.context_tools import ( - change_current_context, - get_current_host_name, - get_global_context, -) -from ayon_core.pipeline.workfile import create_workdir_extra_folders +from ayon_core.pipeline.context_tools import get_global_context from ayon_core.tools.common_models import ( HierarchyModel, @@ -140,12 +134,7 @@ def __init__(self, host=None): if host is None: host = registered_host() - host_is_valid = False - if host is not None: - missing_methods = ( - IWorkfileHost.get_missing_workfile_methods(host) - ) - host_is_valid = len(missing_methods) == 0 + host_is_valid = isinstance(host, IWorkfileHost) self._host = host self._host_is_valid = host_is_valid @@ -182,7 +171,7 @@ def _create_users_model(self): return UsersModel(self) def _create_workfiles_model(self): - return WorkfilesModel(self) + return WorkfilesModel(self._host, self) def _create_expected_selection_obj(self): return WorkfilesToolExpectedSelection(self) @@ -293,28 +282,14 @@ def get_user_items_by_name(self): # Host information def get_workfile_extensions(self): - host = self._host - if isinstance(host, IWorkfileHost): - return host.get_workfile_extensions() - return host.file_extensions() + return self._host.get_workfile_extensions() def has_unsaved_changes(self): - host = self._host - if isinstance(host, IWorkfileHost): - return host.workfile_has_unsaved_changes() - return host.has_unsaved_changes() + return self._host.workfile_has_unsaved_changes() # Current context def get_host_name(self): - host = self._host - if isinstance(host, IWorkfileHost): - return host.name - return get_current_host_name() - - def _get_host_current_context(self): - if hasattr(self._host, "get_current_context"): - return self._host.get_current_context() - return get_global_context() + return self._host.name def get_current_project_name(self): return self._current_project_name @@ -326,10 +301,7 @@ def get_current_task_name(self): return self._current_task_name def get_current_workfile(self): - host = self._host - if isinstance(host, IWorkfileHost): - return host.get_current_workfile() - return host.current_file() + return self._workfiles_model.get_current_workfile() # Selection information def get_selected_folder_id(self): @@ -350,8 +322,12 @@ def set_selected_task(self, task_id, task_name): def get_selected_workfile_path(self): return self._selection_model.get_selected_workfile_path() - def set_selected_workfile_path(self, path): - self._selection_model.set_selected_workfile_path(path) + def set_selected_workfile_path( + self, rootless_path, path, workfile_entity_id + ): + self._selection_model.set_selected_workfile_path( + rootless_path, path, workfile_entity_id + ) def get_selected_representation_id(self): return self._selection_model.get_selected_representation_id() @@ -424,7 +400,7 @@ def get_workarea_dir_by_context(self, folder_id, task_id): def get_workarea_file_items(self, folder_id, task_name, sender=None): task_id = self._get_task_id(folder_id, task_name) return self._workfiles_model.get_workarea_file_items( - folder_id, task_id, task_name + folder_id, task_id ) def get_workarea_save_as_data(self, folder_id, task_id): @@ -450,28 +426,34 @@ def fill_workarea_filepath( ) def get_published_file_items(self, folder_id, task_id): - task_name = None - if task_id: - task = self.get_task_entity( - self.get_current_project_name(), task_id - ) - task_name = task.get("name") - return self._workfiles_model.get_published_file_items( - folder_id, task_name) + folder_id, task_id + ) - def get_workfile_info(self, folder_id, task_name, filepath): - task_id = self._get_task_id(folder_id, task_name) + def get_workfile_info(self, folder_id, task_id, rootless_path): return self._workfiles_model.get_workfile_info( - folder_id, task_id, filepath + folder_id, task_id, rootless_path ) - def save_workfile_info(self, folder_id, task_name, filepath, note): - task_id = self._get_task_id(folder_id, task_name) + def save_workfile_info( + self, + task_id, + rootless_path, + version=None, + comment=None, + description=None, + ): self._workfiles_model.save_workfile_info( - folder_id, task_id, filepath, note + task_id, + rootless_path, + version, + comment, + description, ) + def get_workfile_entities(self, task_id): + return self._workfiles_model.get_workfile_entities(task_id) + def reset(self): if not self._host_is_valid: self._emit_event("controller.reset.started") @@ -509,6 +491,7 @@ def reset(self): self._projects_model.reset() self._hierarchy_model.reset() + self._workfiles_model.reset() if not expected_folder_id: expected_folder_id = folder_id @@ -528,53 +511,30 @@ def reset(self): # Controller actions def open_workfile(self, folder_id, task_id, filepath): - self._emit_event("open_workfile.started") - - failed = False - try: - self._open_workfile(folder_id, task_id, filepath) - - except Exception: - failed = True - self.log.warning("Open of workfile failed", exc_info=True) - - self._emit_event( - "open_workfile.finished", - {"failed": failed}, - ) + self._workfiles_model.open_workfile(folder_id, task_id, filepath) def save_current_workfile(self): - current_file = self.get_current_workfile() - self._host_save_workfile(current_file) + self._workfiles_model.save_current_workfile() def save_as_workfile( self, folder_id, task_id, + rootless_workdir, workdir, filename, - template_key, - artist_note, + version, + comment, + description, ): - self._emit_event("save_as.started") - - failed = False - try: - self._save_as_workfile( - folder_id, - task_id, - workdir, - filename, - template_key, - artist_note=artist_note, - ) - except Exception: - failed = True - self.log.warning("Save as failed", exc_info=True) - - self._emit_event( - "save_as.finished", - {"failed": failed}, + self._workfiles_model.save_as_workfile( + folder_id, + task_id, + rootless_workdir, + filename, + version, + comment, + description, ) def copy_workfile_representation( @@ -585,64 +545,48 @@ def copy_workfile_representation( task_id, workdir, filename, - template_key, - artist_note, + rootless_workdir, + version, + comment, + description, ): - self._emit_event("copy_representation.started") - - failed = False - try: - self._save_as_workfile( - folder_id, - task_id, - workdir, - filename, - template_key, - artist_note, - src_filepath=representation_filepath - ) - except Exception: - failed = True - self.log.warning( - "Copy of workfile representation failed", exc_info=True - ) - - self._emit_event( - "copy_representation.finished", - {"failed": failed}, + self._workfiles_model.copy_workfile_representation( + representation_id, + representation_filepath, + folder_id, + task_id, + workdir, + filename, + rootless_workdir, + version, + comment, + description, ) - def duplicate_workfile(self, src_filepath, workdir, filename, artist_note): - self._emit_event("workfile_duplicate.started") - - failed = False - try: - dst_filepath = os.path.join(workdir, filename) - shutil.copy(src_filepath, dst_filepath) - except Exception: - failed = True - self.log.warning("Duplication of workfile failed", exc_info=True) - - self._emit_event( - "workfile_duplicate.finished", - {"failed": failed}, + def duplicate_workfile( + self, + folder_id, + task_id, + src_filepath, + rootless_workdir, + workdir, + filename, + version, + comment, + description + ): + self._workfiles_model.duplicate_workfile( + folder_id, + task_id, + src_filepath, + rootless_workdir, + workdir, + filename, + version, + comment, + description, ) - # Helper host methods that resolve 'IWorkfileHost' interface - def _host_open_workfile(self, filepath): - host = self._host - if isinstance(host, IWorkfileHost): - host.open_workfile(filepath) - else: - host.open_file(filepath) - - def _host_save_workfile(self, filepath): - host = self._host - if isinstance(host, IWorkfileHost): - host.save_workfile(filepath) - else: - host.save_file(filepath) - def _emit_event(self, topic, data=None): self.emit_event(topic, data, "controller") @@ -657,6 +601,11 @@ def _get_task_id(self, folder_id, task_name, sender=None): return None return task_item.id + def _get_host_current_context(self): + if hasattr(self._host, "get_current_context"): + return self._host.get_current_context() + return get_global_context() + # Expected selection # - expected selection is used to restore selection after refresh # or when current context should be used @@ -665,123 +614,3 @@ def _trigger_expected_selection_changed(self): "expected_selection_changed", self._expected_selection.get_expected_selection_data(), ) - - def _get_event_context_data( - self, project_name, folder_id, task_id, folder=None, task=None - ): - if folder is None: - folder = self.get_folder_entity(project_name, folder_id) - if task is None: - task = self.get_task_entity(project_name, task_id) - return { - "project_name": project_name, - "folder_id": folder_id, - "folder_path": folder["path"], - "task_id": task_id, - "task_name": task["name"], - "host_name": self.get_host_name(), - } - - def _open_workfile(self, folder_id, task_id, filepath): - project_name = self.get_current_project_name() - event_data = self._get_event_context_data( - project_name, folder_id, task_id - ) - event_data["filepath"] = filepath - - emit_event("workfile.open.before", event_data, source="workfiles.tool") - - # Change context - task_name = event_data["task_name"] - if ( - folder_id != self.get_current_folder_id() - or task_name != self.get_current_task_name() - ): - self._change_current_context(project_name, folder_id, task_id) - - self._host_open_workfile(filepath) - - emit_event("workfile.open.after", event_data, source="workfiles.tool") - - def _save_as_workfile( - self, - folder_id: str, - task_id: str, - workdir: str, - filename: str, - template_key: str, - artist_note: str, - src_filepath=None, - ): - # Trigger before save event - project_name = self.get_current_project_name() - folder = self.get_folder_entity(project_name, folder_id) - task = self.get_task_entity(project_name, task_id) - task_name = task["name"] - - # QUESTION should the data be different for 'before' and 'after'? - event_data = self._get_event_context_data( - project_name, folder_id, task_id, folder, task - ) - event_data.update({ - "filename": filename, - "workdir_path": workdir, - }) - - emit_event("workfile.save.before", event_data, source="workfiles.tool") - - # Create workfiles root folder - if not os.path.exists(workdir): - self.log.debug("Initializing work directory: %s", workdir) - os.makedirs(workdir) - - # Change context - if ( - folder_id != self.get_current_folder_id() - or task_name != self.get_current_task_name() - ): - self._change_current_context( - project_name, folder_id, task_id, template_key - ) - - # Save workfile - dst_filepath = os.path.join(workdir, filename) - if src_filepath: - shutil.copyfile(src_filepath, dst_filepath) - self._host_open_workfile(dst_filepath) - else: - self._host_save_workfile(dst_filepath) - - # Make sure workfile info exists - if not artist_note: - artist_note = None - self.save_workfile_info( - folder_id, task_name, dst_filepath, note=artist_note - ) - - # Create extra folders - create_workdir_extra_folders( - workdir, - self.get_host_name(), - task["taskType"], - task_name, - project_name - ) - - # Trigger after save events - emit_event("workfile.save.after", event_data, source="workfiles.tool") - - def _change_current_context( - self, project_name, folder_id, task_id, template_key=None - ): - # Change current context - folder_entity = self.get_folder_entity(project_name, folder_id) - task_entity = self.get_task_entity(project_name, task_id) - change_current_context( - folder_entity, - task_entity, - template_key=template_key - ) - self._current_folder_id = folder_entity["id"] - self._current_folder_path = folder_entity["path"] - self._current_task_name = task_entity["name"] diff --git a/client/ayon_core/tools/workfiles/models/selection.py b/client/ayon_core/tools/workfiles/models/selection.py index 2f0896842d..9a6440b2a1 100644 --- a/client/ayon_core/tools/workfiles/models/selection.py +++ b/client/ayon_core/tools/workfiles/models/selection.py @@ -62,7 +62,9 @@ def set_selected_task(self, task_id, task_name): def get_selected_workfile_path(self): return self._workfile_path - def set_selected_workfile_path(self, path): + def set_selected_workfile_path( + self, rootless_path, path, workfile_entity_id + ): if path == self._workfile_path: return @@ -72,9 +74,11 @@ def set_selected_workfile_path(self, path): { "project_name": self._controller.get_current_project_name(), "path": path, + "rootless_path": rootless_path, "folder_id": self._folder_id, "task_name": self._task_name, "task_id": self._task_id, + "workfile_entity_id": workfile_entity_id, }, self.event_source ) diff --git a/client/ayon_core/tools/workfiles/models/workfiles.py b/client/ayon_core/tools/workfiles/models/workfiles.py index cc034571f3..4f5fb9890d 100644 --- a/client/ayon_core/tools/workfiles/models/workfiles.py +++ b/client/ayon_core/tools/workfiles/models/workfiles.py @@ -1,13 +1,26 @@ +from __future__ import annotations import os -import re import copy import uuid +import platform +import typing +from typing import Optional, Any -import arrow import ayon_api from ayon_api.operations import OperationsSession -from ayon_core.lib import get_ayon_username +from ayon_core.lib import ( + get_ayon_username, + NestedCacheItem, + CacheItem, + Logger, +) +from ayon_core.host import ( + HostBase, + IWorkfileHost, + WorkfileInfo, + PublishedWorkfileInfo, +) from ayon_core.pipeline.template_data import ( get_template_data, get_task_template_data, @@ -16,316 +29,394 @@ from ayon_core.pipeline.workfile import ( get_workdir_with_workdir_data, get_workfile_template_key, - get_last_workfile_with_version, + get_last_workfile_with_version_from_paths, + get_comments_from_workfile_paths, + open_workfile, + save_current_workfile_to, + copy_and_open_workfile, + copy_and_open_workfile_representation, ) from ayon_core.pipeline.version_start import get_versioning_start from ayon_core.tools.workfiles.abstract import ( WorkareaFilepathResult, - FileItem, - WorkfileInfo, + AbstractWorkfilesBackend, ) -_NOT_SET = object() - - -class CommentMatcher(object): - """Use anatomy and work file data to parse comments from filenames. - - Args: - extensions (set[str]): Set of extensions. - file_template (AnatomyStringTemplate): File template. - data (dict[str, Any]): Data to fill the template with. - - """ - def __init__(self, extensions, file_template, data): - self.fname_regex = None - - if "{comment}" not in file_template: - # Don't look for comment if template doesn't allow it - return - - # Create a regex group for extensions - any_extension = "(?:{})".format( - "|".join(re.escape(ext.lstrip(".")) for ext in extensions) - ) +if typing.TYPE_CHECKING: + from ayon_core.pipeline import Anatomy - # Use placeholders that will never be in the filename - temp_data = copy.deepcopy(data) - temp_data["comment"] = "<>" - temp_data["version"] = "<>" - temp_data["ext"] = "<>" - - fname_pattern = file_template.format_strict(temp_data) - fname_pattern = re.escape(fname_pattern) - - # Replace comment and version with something we can match with regex - replacements = { - "<>": "(.+)", - "<>": "[0-9]+", - "<>": any_extension, - } - for src, dest in replacements.items(): - fname_pattern = fname_pattern.replace(re.escape(src), dest) - - # Match from beginning to end of string to be safe - fname_pattern = "^{}$".format(fname_pattern) - - self.fname_regex = re.compile(fname_pattern) +_NOT_SET = object() - def parse_comment(self, filepath): - """Parse the {comment} part from a filename""" - if not self.fname_regex: - return - fname = os.path.basename(filepath) - match = self.fname_regex.match(fname) - if match: - return match.group(1) +class HostType(HostBase, IWorkfileHost): + pass -class WorkareaModel: - """Workfiles model looking for workfiles in workare folder. +class WorkfilesModel: + """Workfiles model.""" - Workarea folder is usually task and host specific, defined by - anatomy templates. Is looking for files with extensions defined - by host integration. - """ + def __init__( + self, + host: HostType, + controller: AbstractWorkfilesBackend + ): + self._host: HostType = host + self._controller: AbstractWorkfilesBackend = controller - def __init__(self, controller): - self._controller = controller + self._log = Logger.get_logger("WorkfilesModel") extensions = None if controller.is_host_valid(): extensions = controller.get_workfile_extensions() - self._extensions = extensions + self._extensions: Optional[set[str]] = extensions + + self._current_username = _NOT_SET + + # Workarea self._base_data = None self._fill_data_by_folder_id = {} self._task_data_by_folder_id = {} self._workdir_by_context = {} + self._workarea_file_items_mapping = {} + self._workarea_file_items_cache = NestedCacheItem( + levels=1, default_factory=list + ) - @property - def project_name(self): - return self._controller.get_current_project_name() + # Published workfiles + self._repre_by_id = {} + self._published_workfile_items_cache = NestedCacheItem( + levels=1, default_factory=list + ) + + # Entities + self._workfile_entities_by_task_id = {} def reset(self): self._base_data = None self._fill_data_by_folder_id = {} self._task_data_by_folder_id = {} + self._workdir_by_context = {} + self._workarea_file_items_mapping = {} + self._workarea_file_items_cache.reset() - def _get_base_data(self): - if self._base_data is None: - base_data = get_template_data( - ayon_api.get_project(self.project_name) - ) - base_data["app"] = self._controller.get_host_name() - self._base_data = base_data - return copy.deepcopy(self._base_data) + self._repre_by_id = {} + self._published_workfile_items_cache.reset() - def _get_folder_data(self, folder_id): - fill_data = self._fill_data_by_folder_id.get(folder_id) - if fill_data is None: - folder = self._controller.get_folder_entity( - self.project_name, folder_id - ) - fill_data = get_folder_template_data(folder, self.project_name) - self._fill_data_by_folder_id[folder_id] = fill_data - return copy.deepcopy(fill_data) + self._workfile_entities_by_task_id = {} - def _get_task_data(self, project_entity, folder_id, task_id): - task_data = self._task_data_by_folder_id.setdefault(folder_id, {}) - if task_id not in task_data: - task = self._controller.get_task_entity( - self.project_name, task_id - ) - if task: - task_data[task_id] = get_task_template_data( - project_entity, task) - return copy.deepcopy(task_data[task_id]) + # Host functionality + def get_current_workfile(self): + return self._host.get_current_workfile() - def _prepare_fill_data(self, folder_id, task_id): - if not folder_id or not task_id: - return {} + def open_workfile(self, folder_id, task_id, filepath): + self._emit_event("open_workfile.started") - base_data = self._get_base_data() - project_name = base_data["project"]["name"] - folder_data = self._get_folder_data(folder_id) - project_entity = self._controller.get_project_entity(project_name) - task_data = self._get_task_data(project_entity, folder_id, task_id) + failed = False + try: + self._open_workfile(folder_id, task_id, filepath) - base_data.update(folder_data) - base_data.update(task_data) + except Exception: + failed = True + self._log.warning("Open of workfile failed", exc_info=True) - return base_data + self._emit_event( + "open_workfile.finished", + {"failed": failed}, + ) - def get_workarea_dir_by_context(self, folder_id, task_id): - if not folder_id or not task_id: - return None - folder_mapping = self._workdir_by_context.setdefault(folder_id, {}) - workdir = folder_mapping.get(task_id) - if workdir is not None: - return workdir + def save_current_workfile(self): + current_file = self.get_current_workfile() + self._host.save_workfile(current_file) - workdir_data = self._prepare_fill_data(folder_id, task_id) + def save_as_workfile( + self, + folder_id, + task_id, + rootless_workdir, + workdir, + filename, + version, + comment, + description, + ): + self._emit_event("save_as.started") - workdir = get_workdir_with_workdir_data( - workdir_data, - self.project_name, - anatomy=self._controller.project_anatomy, + filepath = os.path.join(workdir, filename) + rootless_path = f"{rootless_workdir}/{filename}" + project_name = self._controller.get_current_project_name() + folder_entity = self._controller.get_folder_entity( + project_name, folder_id ) - folder_mapping[task_id] = workdir - return workdir + task_entity = self._controller.get_task_entity( + project_name, task_id + ) + workfile_entities = self.get_workfile_entities(task_id) + failed = False + try: + workfile_info = save_current_workfile_to( + filepath, + folder_entity, + task_entity, + version, + comment, + description, + source="workfiles.tool", + rootless_path=rootless_path, + workfile_entities=workfile_entities, + username=self._get_current_username(), + project_entity=self._controller.get_project_entity( + project_name + ), + project_settings=self._controller.project_settings, + anatomy=self._controller.project_anatomy, + ) + self._update_workfile_info( + task_id, rootless_path, description, workfile_info + ) + self._update_current_context( + folder_id, folder_entity["path"], task_entity["name"] + ) - def get_file_items(self, folder_id, task_id, task_name): - items = [] - if not folder_id or not task_id: - return items + except Exception: + failed = True + self._log.warning("Save as failed", exc_info=True) - workdir = self.get_workarea_dir_by_context(folder_id, task_id) - if not os.path.exists(workdir): - return items + self._emit_event( + "save_as.finished", + {"failed": failed}, + ) - for filename in os.listdir(workdir): - # We want to support both files and folders. e.g. Silhoutte uses - # folders as its project files. So we do not check whether it is - # a file or not. - filepath = os.path.join(workdir, filename) + def copy_workfile_representation( + self, + representation_id, + representation_filepath, + folder_id, + task_id, + workdir, + filename, + rootless_workdir, + version, + comment, + description, + ): + self._emit_event("copy_representation.started") - ext = os.path.splitext(filename)[1].lower() - if ext not in self._extensions: - continue + project_name = self._project_name + folder_entity = self._controller.get_folder_entity( + self._project_name, folder_id + ) + task_entity = self._controller.get_task_entity( + self._project_name, task_id + ) + repre_entity = self._repre_by_id.get(representation_id) + dst_filepath = os.path.join(workdir, filename) + rootless_path = f"{rootless_workdir}/{filename}" - workfile_info = self._controller.get_workfile_info( - folder_id, task_name, filepath + failed = False + try: + workfile_info = copy_and_open_workfile_representation( + project_name, + representation_id, + dst_filepath, + folder_entity, + task_entity, + version=version, + comment=comment, + description=description, + rootless_path=rootless_path, + representation_entity=repre_entity, + representation_path=representation_filepath, + workfile_entities=self.get_workfile_entities(task_id), + username=self._get_current_username(), + project_entity=self._controller.get_project_entity( + project_name + ), + project_settings=self._controller.project_settings, + anatomy=self._controller.project_anatomy, + ) + self._update_workfile_info( + task_id, rootless_path, description, workfile_info + ) + self._update_current_context( + folder_id, folder_entity["path"], task_entity["name"] ) - modified = os.path.getmtime(filepath) - items.append(FileItem( - workdir, - filename, - modified, - workfile_info.created_by, - workfile_info.updated_by, - )) - return items - def _get_template_key(self, fill_data): - task_type = fill_data.get("task", {}).get("type") - # TODO cache - return get_workfile_template_key( - self.project_name, - task_type, - self._controller.get_host_name(), + except Exception: + failed = True + self._log.warning( + "Copy of workfile representation failed", exc_info=True + ) + + self._emit_event( + "copy_representation.finished", + {"failed": failed}, ) - def _get_last_workfile_version( - self, workdir, file_template, fill_data, extensions + def duplicate_workfile( + self, + folder_id, + task_id, + src_filepath, + rootless_workdir, + workdir, + filename, + version, + comment, + description ): - """ + self._emit_event("workfile_duplicate.started") - Todos: - Validate if logic of this function is correct. It does return - last version + 1 which might be wrong. + project_name = self._controller.get_current_project_name() + project_entity = self._controller.get_project_entity(project_name) + folder_entity = self._controller.get_folder_entity( + project_name, folder_id + ) + task_entity = self._controller.get_task_entity(project_name, task_id) + workfile_entities = self.get_workfile_entities(task_id) + rootless_path = f"{rootless_workdir}/{filename}" + workfile_path = os.path.join(workdir, filename) + failed = False + try: + copy_and_open_workfile( + src_filepath, + workfile_path, + folder_entity, + task_entity, + version, + comment, + description, + source="workfiles.tool", + rootless_path=rootless_path, + workfile_entities=workfile_entities, + username=self._get_current_username(), + project_entity=project_entity, + project_settings=self._controller.project_settings, + anatomy=self._controller.project_anatomy, + ) - Args: - workdir (str): Workdir path. - file_template (str): File template. - fill_data (dict[str, Any]): Fill data. - extensions (set[str]): Extensions. + except Exception: + failed = True + self._log.warning("Duplication of workfile failed", exc_info=True) - Returns: - int: Next workfile version. + self._emit_event( + "workfile_duplicate.finished", + {"failed": failed}, + ) - """ - version = get_last_workfile_with_version( - workdir, file_template, fill_data, extensions - )[1] + def get_workfile_entities(self, task_id: str): + if not task_id: + return [] + workfile_entities = self._workfile_entities_by_task_id.get(task_id) + if workfile_entities is None: + workfile_entities = list(ayon_api.get_workfiles_info( + self._project_name, + task_ids=[task_id], + )) + self._workfile_entities_by_task_id[task_id] = workfile_entities + return workfile_entities - if version is None: - task_info = fill_data.get("task", {}) - version = get_versioning_start( - self.project_name, - self._controller.get_host_name(), - task_name=task_info.get("name"), - task_type=task_info.get("type"), - product_type="workfile", - project_settings=self._controller.project_settings, - ) - else: - version += 1 - return version + def get_workfile_info( + self, + folder_id: Optional[str], + task_id: Optional[str], + rootless_path: Optional[str] + ): + if not folder_id or not task_id or not rootless_path: + return None + + mapping = self._workarea_file_items_mapping.get(task_id) + if mapping is None: + self._cache_file_items(folder_id, task_id) + mapping = self._workarea_file_items_mapping[task_id] + return mapping.get(rootless_path) - def _get_comments_from_root( + def save_workfile_info( self, - file_template, - extensions, - fill_data, - root, - current_filename, + task_id: str, + rootless_path: str, + version: Optional[int], + comment: Optional[str], + description: Optional[str], ): - """Get comments from root directory. + self._save_workfile_info( + task_id, + rootless_path, + version, + comment, + description, + ) + + self._update_file_description( + task_id, rootless_path, description + ) + + def get_workarea_dir_by_context( + self, folder_id: str, task_id: str + ) -> Optional[str]: + """Workarea dir for passed context. + + The directory path is based on project anatomy templates. Args: - file_template (AnatomyStringTemplate): File template. - extensions (set[str]): Extensions. - fill_data (dict[str, Any]): Fill data. - root (str): Root directory. - current_filename (str): Current filename. + folder_id (str): Folder id. + task_id (str): Task id. Returns: - Tuple[list[str], Union[str, None]]: Comment hints and current - comment. + Optional[str]: Workarea dir path or None for invalid context. """ - current_comment = None - filenames = [] - if root and os.path.exists(root): - for filename in os.listdir(root): - path = os.path.join(root, filename) - if not os.path.isfile(path): - continue - - ext = os.path.splitext(filename)[-1].lower() - if ext in extensions: - filenames.append(filename) - - if not filenames: - return [], current_comment - - matcher = CommentMatcher(extensions, file_template, fill_data) - - comment_hints = set() - for filename in filenames: - comment = matcher.parse_comment(filename) - if comment: - comment_hints.add(comment) - if filename == current_filename: - current_comment = comment - - return list(comment_hints), current_comment - - def _get_workdir(self, anatomy, template_key, fill_data): - directory_template = anatomy.get_template_item( - "work", template_key, "directory" + if not folder_id or not task_id: + return None + folder_mapping = self._workdir_by_context.setdefault(folder_id, {}) + workdir = folder_mapping.get(task_id) + if workdir is not None: + return workdir + + workdir_data = self._prepare_fill_data(folder_id, task_id) + + workdir = get_workdir_with_workdir_data( + workdir_data, + self._project_name, + anatomy=self._controller.project_anatomy, ) - return directory_template.format_strict(fill_data).normalized() + folder_mapping[task_id] = workdir + return workdir + + def get_workarea_file_items(self, folder_id, task_id): + """Workfile items for passed context from workarea. + + Args: + folder_id (Optional[str]): Folder id. + task_id (Optional[str]): Task id. + + Returns: + list[WorkfileInfo]: List of file items matching workarea of passed + context. - def get_workarea_save_as_data(self, folder_id, task_id): + """ + return self._cache_file_items(folder_id, task_id) + + def get_workarea_save_as_data( + self, folder_id: Optional[str], task_id: Optional[str] + ) -> dict[str, Any]: folder_entity = None task_entity = None if folder_id: folder_entity = self._controller.get_folder_entity( - self.project_name, folder_id + self._project_name, folder_id ) if folder_entity and task_id: task_entity = self._controller.get_task_entity( - self.project_name, task_id + self._project_name, task_id ) - if not folder_entity or not task_entity: + if not folder_entity or not task_entity or self._extensions is None: return { "template_key": None, "template_has_version": None, "template_has_comment": None, "ext": None, "workdir": None, + "rootless_workdir": None, "comment": None, "comment_hints": None, "last_version": None, @@ -349,6 +440,17 @@ def get_workarea_save_as_data(self, folder_id, task_id): workdir = self._get_workdir(anatomy, template_key, fill_data) + rootless_workdir = workdir + if platform.system().lower() == "windows": + rootless_workdir = rootless_workdir.replace("\\", "/") + + used_roots = workdir.used_values.get("root") + if used_roots: + used_root_name = next(iter(used_roots)) + root_value = used_roots[used_root_name] + workdir_end = rootless_workdir[len(root_value):].lstrip("/") + rootless_workdir = f"{{root[{used_root_name}]}}/{workdir_end}" + file_template = anatomy.get_template_item( "work", template_key, "file" ) @@ -357,15 +459,20 @@ def get_workarea_save_as_data(self, folder_id, task_id): template_has_version = "{version" in file_template_str template_has_comment = "{comment" in file_template_str - comment_hints, comment = self._get_comments_from_root( - file_template, + file_items = self.get_workarea_file_items(folder_id, task_id) + filepaths = [ + item.filepath + for item in file_items + ] + comment_hints, comment = get_comments_from_workfile_paths( + filepaths, extensions, + file_template, fill_data, - workdir, current_filename, ) last_version = self._get_last_workfile_version( - workdir, file_template_str, fill_data, extensions + filepaths, file_template_str, fill_data, extensions ) return { @@ -374,6 +481,7 @@ def get_workarea_save_as_data(self, folder_id, task_id): "template_has_comment": template_has_comment, "ext": current_ext, "workdir": workdir, + "rootless_workdir": rootless_workdir, "comment": comment, "comment_hints": comment_hints, "last_version": last_version, @@ -382,13 +490,13 @@ def get_workarea_save_as_data(self, folder_id, task_id): def fill_workarea_filepath( self, - folder_id, - task_id, - extension, - use_last_version, - version, - comment, - ): + folder_id: str, + task_id: str, + extension: str, + use_last_version: bool, + version: int, + comment: str, + ) -> WorkareaFilepathResult: """Fill workarea filepath based on context. Args: @@ -415,8 +523,16 @@ def fill_workarea_filepath( ) if use_last_version: + file_items = self.get_workarea_file_items(folder_id, task_id) + filepaths = [ + item.filepath + for item in file_items + ] version = self._get_last_workfile_version( - workdir, file_template.template, fill_data, self._extensions + filepaths, + file_template.template, + fill_data, + self._extensions ) fill_data["version"] = version fill_data["ext"] = extension.lstrip(".") @@ -439,374 +555,449 @@ def fill_workarea_filepath( exists ) + def get_published_file_items( + self, folder_id: str, task_id: str + ) -> list[PublishedWorkfileInfo]: + """Published workfiles for passed context. -class WorkfileEntitiesModel: - """Workfile entities model. + Args: + folder_id (str): Folder id. + task_id (str): Task id. - Args: - control (AbstractWorkfileController): Controller object. - """ + Returns: + list[PublishedWorkfileInfo]: List of files for published workfiles. - def __init__(self, controller): - self._controller = controller - self._cache = {} - self._items = {} - self._current_username = _NOT_SET + """ + if not folder_id: + return [] + + cache = self._published_workfile_items_cache[folder_id] + if not cache.is_valid: + project_name = self._project_name + anatomy = self._controller.project_anatomy + + product_entities = ayon_api.get_products( + project_name, + folder_ids={folder_id}, + product_types={"workfile"}, + fields={"id", "name"} + ) - def _get_workfile_info_identifier( - self, folder_id, task_id, rootless_path - ): - return "_".join([folder_id, task_id, rootless_path]) + version_entities = [] + product_ids = {product["id"] for product in product_entities} + if product_ids: + # Get version docs of products with their families + version_entities = list(ayon_api.get_versions( + project_name, + product_ids=product_ids, + fields={"id", "author", "taskId"}, + )) + + repre_entities = [] + if version_entities: + repre_entities = list(ayon_api.get_representations( + project_name, + version_ids={v["id"] for v in version_entities} + )) + + self._repre_by_id.update({ + repre_entity["id"]: repre_entity + for repre_entity in repre_entities + }) + + cache.update_data(self._host.list_published_workfiles( + project_name, + folder_id, + anatomy, + )) - def _get_rootless_path(self, filepath): - anatomy = self._controller.project_anatomy + items = cache.get_data() - workdir, filename = os.path.split(filepath) - _, rootless_dir = anatomy.find_root_template_from_path(workdir) - return "/".join([ - os.path.normpath(rootless_dir).replace("\\", "/"), - filename - ]) + if task_id: + items = [ + item + for item in items + if item.task_id == task_id + ] + return items - def _prepare_workfile_info_item( - self, folder_id, task_id, workfile_info, filepath - ): - note = "" - created_by = None - updated_by = None - if workfile_info: - note = workfile_info["attrib"].get("description") or "" - created_by = workfile_info.get("createdBy") - updated_by = workfile_info.get("updatedBy") - - filestat = os.stat(filepath) - return WorkfileInfo( - folder_id, - task_id, - filepath, - filesize=filestat.st_size, - creation_time=filestat.st_ctime, - modification_time=filestat.st_mtime, - created_by=created_by, - updated_by=updated_by, - note=note - ) - - def _get_workfile_info(self, folder_id, task_id, identifier): - workfile_info = self._cache.get(identifier) - if workfile_info is not None: - return workfile_info - - for workfile_info in ayon_api.get_workfiles_info( - self._controller.get_current_project_name(), - task_ids=[task_id], - fields=["id", "path", "attrib", "createdBy", "updatedBy"], - ): - workfile_identifier = self._get_workfile_info_identifier( - folder_id, task_id, workfile_info["path"] - ) - self._cache[workfile_identifier] = workfile_info - return self._cache.get(identifier) + @property + def _project_name(self) -> str: + return self._controller.get_current_project_name() - def get_workfile_info( - self, folder_id, task_id, filepath, rootless_path=None - ): - if not folder_id or not task_id or not filepath: - return None + @property + def _host_name(self) -> str: + return self._host.name - if rootless_path is None: - rootless_path = self._get_rootless_path(filepath) + def _emit_event(self, topic, data=None): + self._controller.emit_event(topic, data, "workfiles") - identifier = self._get_workfile_info_identifier( - folder_id, task_id, rootless_path) - item = self._items.get(identifier) - if item is None: - workfile_info = self._get_workfile_info( - folder_id, task_id, identifier + def _get_current_username(self) -> str: + if self._current_username is _NOT_SET: + self._current_username = get_ayon_username() + return self._current_username + + # --- Host --- + def _get_event_context_data( + self, + project_name: str, + folder_id: str, + task_id: str, + folder_entity: Optional[dict[str, Any]] = None, + task_entity: Optional[dict[str, Any]] = None, + ): + if folder_entity is None: + folder_entity = self._controller.get_folder_entity( + project_name, folder_id ) - item = self._prepare_workfile_info_item( - folder_id, task_id, workfile_info, filepath + if task_entity is None: + task_entity = self._controller.get_task_entity( + project_name, task_id ) - self._items[identifier] = item - return item + return { + "project_name": project_name, + "folder_id": folder_id, + "folder_path": folder_entity["path"], + "task_id": task_id, + "task_name": task_entity["name"], + "host_name": self._host_name, + } - def save_workfile_info(self, folder_id, task_id, filepath, note): - rootless_path = self._get_rootless_path(filepath) - identifier = self._get_workfile_info_identifier( - folder_id, task_id, rootless_path + def _open_workfile(self, folder_id: str, task_id: str, filepath: str): + # TODO move to workfiles pipeline + project_name = self._project_name + folder_entity = self._controller.get_folder_entity( + project_name, folder_id ) - workfile_info = self._get_workfile_info( - folder_id, task_id, identifier + task_entity = self._controller.get_task_entity( + project_name, task_id + ) + open_workfile(filepath, folder_entity, task_entity) + self._update_current_context( + folder_id, folder_entity["path"], task_entity["name"] ) - if not workfile_info: - self._cache[identifier] = self._create_workfile_info_entity( - task_id, rootless_path, note or "") - self._items.pop(identifier, None) - return - old_note = workfile_info.get("attrib", {}).get("note") + def _update_current_context(self, folder_id, folder_path, task_name): + self._current_folder_id = folder_id + self._current_folder_path = folder_path + self._current_task_name = task_name - new_workfile_info = copy.deepcopy(workfile_info) - update_data = {} - if note is not None and old_note != note: - update_data["attrib"] = {"description": note} - attrib = new_workfile_info.setdefault("attrib", {}) - attrib["description"] = note + # --- Workarea --- + def _reset_workarea_file_items(self, task_id: str): + cache: CacheItem = self._workarea_file_items_cache[task_id] + cache.set_invalid() + self._workarea_file_items_mapping.pop(task_id, None) - username = self._get_current_username() - # Automatically fix 'createdBy' and 'updatedBy' fields - # NOTE both fields were not automatically filled by server - # until 1.1.3 release. - if workfile_info.get("createdBy") is None: - update_data["createdBy"] = username - new_workfile_info["createdBy"] = username + def _get_base_data(self) -> dict[str, Any]: + if self._base_data is None: + base_data = get_template_data( + ayon_api.get_project(self._project_name), + host_name=self._host_name, + ) + self._base_data = base_data + return copy.deepcopy(self._base_data) - if workfile_info.get("updatedBy") != username: - update_data["updatedBy"] = username - new_workfile_info["updatedBy"] = username + def _get_folder_data(self, folder_id: str) -> dict[str, Any]: + fill_data = self._fill_data_by_folder_id.get(folder_id) + if fill_data is None: + folder = self._controller.get_folder_entity( + self._project_name, folder_id + ) + fill_data = get_folder_template_data(folder, self._project_name) + self._fill_data_by_folder_id[folder_id] = fill_data + return copy.deepcopy(fill_data) - if not update_data: - return + def _get_task_data( + self, + project_entity: dict[str, Any], + folder_id: str, + task_id: str + ) -> dict[str, Any]: + task_data = self._task_data_by_folder_id.setdefault(folder_id, {}) + if task_id not in task_data: + task_entity = self._controller.get_task_entity( + self._project_name, task_id + ) + if task_entity: + task_data[task_id] = get_task_template_data( + project_entity, task_entity + ) + return copy.deepcopy(task_data[task_id]) - self._cache[identifier] = new_workfile_info - self._items.pop(identifier, None) + def _prepare_fill_data( + self, folder_id: str, task_id: str + ) -> dict[str, Any]: + if not folder_id or not task_id: + return {} - project_name = self._controller.get_current_project_name() + base_data = self._get_base_data() + project_name = base_data["project"]["name"] + folder_data = self._get_folder_data(folder_id) + project_entity = self._controller.get_project_entity(project_name) + task_data = self._get_task_data(project_entity, folder_id, task_id) - session = OperationsSession() - session.update_entity( - project_name, - "workfile", - workfile_info["id"], - update_data, - ) - session.commit() + base_data.update(folder_data) + base_data.update(task_data) - def _create_workfile_info_entity(self, task_id, rootless_path, note): - extension = os.path.splitext(rootless_path)[1] + return base_data - project_name = self._controller.get_current_project_name() + def _cache_file_items( + self, folder_id: Optional[str], task_id: Optional[str] + ) -> list[WorkfileInfo]: + if not folder_id or not task_id: + return [] - username = self._get_current_username() - workfile_info = { - "id": uuid.uuid4().hex, - "path": rootless_path, - "taskId": task_id, - "attrib": { - "extension": extension, - "description": note - }, - # TODO remove 'createdBy' and 'updatedBy' fields when server is - # or above 1.1.3 . - "createdBy": username, - "updatedBy": username, - } + cache: CacheItem = self._workarea_file_items_cache[task_id] + if cache.is_valid: + return cache.get_data() - session = OperationsSession() - session.create_entity(project_name, "workfile", workfile_info) - session.commit() - return workfile_info + project_entity = self._controller.get_project_entity( + self._project_name + ) + folder_entity = self._controller.get_folder_entity( + self._project_name, folder_id + ) + task_entity = self._controller.get_task_entity( + self._project_name, task_id + ) + anatomy = self._controller.project_anatomy + project_settings = self._controller.project_settings + workfile_entities = self._controller.get_workfile_entities(task_id) - def _get_current_username(self): - if self._current_username is _NOT_SET: - self._current_username = get_ayon_username() - return self._current_username + fill_data = self._prepare_fill_data(folder_id, task_id) + template_key = self._get_template_key(fill_data) + items = self._host.list_workfiles( + self._project_name, + folder_entity, + task_entity, + project_entity=project_entity, + anatomy=anatomy, + template_key=template_key, + project_settings=project_settings, + workfile_entities=workfile_entities, + ) + cache.update_data(items) -class PublishWorkfilesModel: - """Model for handling of published workfiles. + # Cache items by entity ids and rootless path + self._workarea_file_items_mapping[task_id] = { + item.rootless_path: item + for item in items + } - Todos: - Cache workfiles products and representations for some time. - Note Representations won't change. Only what can change are - versions. - """ + return items - def __init__(self, controller): - self._controller = controller - self._cached_extensions = None - self._cached_repre_extensions = None + def _get_template_key(self, fill_data: dict[str, Any]) -> str: + task_type = fill_data.get("task", {}).get("type") + # TODO cache + return get_workfile_template_key( + self._project_name, + task_type, + self._host_name, + project_settings=self._controller.project_settings, + ) - @property - def _extensions(self): - if self._cached_extensions is None: - exts = self._controller.get_workfile_extensions() or [] - self._cached_extensions = exts - return self._cached_extensions + def _get_last_workfile_version( + self, + filepaths: list[str], + file_template: str, + fill_data: dict[str, Any], + extensions: set[str] + ) -> int: + """ - @property - def _repre_extensions(self): - if self._cached_repre_extensions is None: - self._cached_repre_extensions = { - ext.lstrip(".") for ext in self._extensions - } - return self._cached_repre_extensions + Todos: + Validate if logic of this function is correct. It does return + last version + 1 which might be wrong. - def _file_item_from_representation( - self, repre_entity, project_anatomy, author, task_name=None - ): - if task_name is not None: - task_info = repre_entity["context"].get("task") - if not task_info or task_info["name"] != task_name: - return None - - # Filter by extension - extensions = self._repre_extensions - workfile_path = None - for repre_file in repre_entity["files"]: - ext = ( - os.path.splitext(repre_file["name"])[1] - .lower() - .lstrip(".") - ) - if ext in extensions: - workfile_path = repre_file["path"] - break + Args: + filepaths (list[str]): Workfile paths. + file_template (str): File template. + fill_data (dict[str, Any]): Fill data. + extensions (set[str]): Extensions. - if not workfile_path: - return None + Returns: + int: Next workfile version. - try: - workfile_path = workfile_path.format( - root=project_anatomy.roots) - except Exception as exc: - print("Failed to format workfile path: {}".format(exc)) - - dirpath, filename = os.path.split(workfile_path) - created_at = arrow.get(repre_entity["createdAt"]).to("local") - return FileItem( - dirpath, - filename, - created_at.float_timestamp, - author, - None, - repre_entity["id"] + """ + version = get_last_workfile_with_version_from_paths( + filepaths, file_template, fill_data, extensions + )[1] + if version is not None: + return version + 1 + + task_info = fill_data.get("task", {}) + return get_versioning_start( + self._project_name, + self._host_name, + task_name=task_info.get("name"), + task_type=task_info.get("type"), + product_type="workfile", + project_settings=self._controller.project_settings, ) - def get_file_items(self, folder_id, task_name): - # TODO refactor to use less server API calls - project_name = self._controller.get_current_project_name() - # Get subset docs of folder - product_entities = ayon_api.get_products( - project_name, - folder_ids={folder_id}, - product_types={"workfile"}, - fields={"id", "name"} - ) - - output = [] - product_ids = {product["id"] for product in product_entities} - if not product_ids: - return output - - # Get version docs of products with their families - version_entities = ayon_api.get_versions( - project_name, - product_ids=product_ids, - fields={"id", "author"} - ) - versions_by_id = { - version["id"]: version - for version in version_entities - } - if not versions_by_id: - return output - - # Query representations of filtered versions and add filter for - # extension - repre_entities = ayon_api.get_representations( - project_name, - version_ids=set(versions_by_id) - ) - project_anatomy = self._controller.project_anatomy - - # Filter queried representations by task name if task is set - file_items = [] - for repre_entity in repre_entities: - version_id = repre_entity["versionId"] - version_entity = versions_by_id[version_id] - file_item = self._file_item_from_representation( - repre_entity, - project_anatomy, - version_entity["author"], - task_name, - ) - if file_item is not None: - file_items.append(file_item) - - return file_items - + def _get_workdir( + self, anatomy: "Anatomy", template_key: str, fill_data: dict[str, Any] + ): + directory_template = anatomy.get_template_item( + "work", template_key, "directory" + ) + return directory_template.format_strict(fill_data).normalized() -class WorkfilesModel: - """Workfiles model.""" + def _update_workfile_info( + self, + task_id: str, + rootless_path: str, + description: str, + workfile_entity: dict[str, Any], + ): + self._update_file_description(task_id, rootless_path, description) + workfile_entities = self.get_workfile_entities(task_id) + target_idx = None + for idx, workfile_entity in enumerate(workfile_entities): + if workfile_entity["path"] == rootless_path: + target_idx = idx + break - def __init__(self, controller): - self._controller = controller + if target_idx is None: + workfile_entities.append(workfile_entity) + else: + workfile_entities[target_idx] = workfile_entity - self._entities_model = WorkfileEntitiesModel(controller) - self._workarea_model = WorkareaModel(controller) - self._published_model = PublishWorkfilesModel(controller) + self._reset_workarea_file_items(task_id) - def get_workfile_info(self, folder_id, task_id, filepath): - return self._entities_model.get_workfile_info( - folder_id, task_id, filepath - ) + def _update_file_description( + self, task_id: str, rootless_path: str, description: str + ): + mapping = self._workarea_file_items_mapping.get(task_id) + if not mapping: + return + item = mapping.get(rootless_path) + if item is not None: + item.description = description - def save_workfile_info(self, folder_id, task_id, filepath, note): - self._entities_model.save_workfile_info( - folder_id, task_id, filepath, note + # --- Workfile entities --- + def _save_workfile_info( + self, + task_id: str, + rootless_path: str, + version: Optional[int], + comment: Optional[str], + description: Optional[str], + ): + # TODO create pipeline function for this + workfile_entities = self.get_workfile_entities(task_id) + workfile_entity = next( + ( + _ent + for _ent in workfile_entities + if _ent["path"] == rootless_path + ), + None ) + if not workfile_entity: + workfile_entity = self._create_workfile_info_entity( + task_id, + rootless_path, + version, + comment, + description, + ) + workfile_entities.append(workfile_entity) + return - def get_workarea_dir_by_context(self, folder_id, task_id): - """Workarea dir for passed context. - - The directory path is based on project anatomy templates. + data = {} + for key, value in ( + ("host_name", self._host_name), + ("version", version), + ("comment", comment), + ): + if value is not None: + data[key] = value - Args: - folder_id (str): Folder id. - task_id (str): Task id. + old_data = workfile_entity["data"] - Returns: - Union[str, None]: Workarea dir path or None for invalid context. - """ + changed_data = {} + for key, value in data.items(): + if key not in old_data or old_data[key] != value: + changed_data[key] = value - return self._workarea_model.get_workarea_dir_by_context( - folder_id, task_id) + update_data = {} + if changed_data: + update_data["data"] = changed_data - def get_workarea_file_items(self, folder_id, task_id, task_name): - """Workfile items for passed context from workarea. + old_description = workfile_entity["attrib"].get("description") + if description is not None and old_description != description: + update_data["attrib"] = {"description": description} + workfile_entity["attrib"]["description"] = description - Args: - folder_id (Union[str, None]): Folder id. - task_id (Union[str, None]): Task id. - task_name (Union[str, None]): Task name. + username = self._get_current_username() + # Automatically fix 'createdBy' and 'updatedBy' fields + # NOTE both fields were not automatically filled by server + # until 1.1.3 release. + if workfile_entity.get("createdBy") is None: + update_data["createdBy"] = username + workfile_entity["createdBy"] = username - Returns: - list[FileItem]: List of file items matching workarea of passed - context. - """ - return self._workarea_model.get_file_items( - folder_id, task_id, task_name - ) + if workfile_entity.get("updatedBy") != username: + update_data["updatedBy"] = username + workfile_entity["updatedBy"] = username - def get_workarea_save_as_data(self, folder_id, task_id): - return self._workarea_model.get_workarea_save_as_data( - folder_id, task_id) + if not update_data: + return - def fill_workarea_filepath(self, *args, **kwargs): - return self._workarea_model.fill_workarea_filepath( - *args, **kwargs + session = OperationsSession() + session.update_entity( + self._project_name, + "workfile", + workfile_entity["id"], + update_data, ) + session.commit() - def get_published_file_items(self, folder_id, task_name): - """Published workfiles for passed context. + def _create_workfile_info_entity( + self, + task_id: str, + rootless_path: str, + version: Optional[int], + comment: Optional[str], + description: str, + ) -> dict[str, Any]: + extension = os.path.splitext(rootless_path)[1] - Args: - folder_id (str): Folder id. - task_name (str): Task name. + attrib = {} + for key, value in ( + ("extension", extension), + ("description", description), + ): + if value is not None: + attrib[key] = value + + data = {} + for key, value in ( + ("host_name", self._host_name), + ("version", version), + ("comment", comment), + ): + if value is not None: + data[key] = value - Returns: - list[FileItem]: List of files for published workfiles. - """ + username = self._get_current_username() + workfile_info = { + "id": uuid.uuid4().hex, + "path": rootless_path, + "taskId": task_id, + "attrib": attrib, + "data": data, + # TODO remove 'createdBy' and 'updatedBy' fields when server is + # or above 1.1.3 . + "createdBy": username, + "updatedBy": username, + } - return self._published_model.get_file_items(folder_id, task_name) + session = OperationsSession() + session.create_entity( + self._project_name, "workfile", workfile_info + ) + session.commit() + return workfile_info diff --git a/client/ayon_core/tools/workfiles/widgets/files_widget.py b/client/ayon_core/tools/workfiles/widgets/files_widget.py index f0b74f4289..012a12ab17 100644 --- a/client/ayon_core/tools/workfiles/widgets/files_widget.py +++ b/client/ayon_core/tools/workfiles/widgets/files_widget.py @@ -200,6 +200,9 @@ def _on_workarea_open_clicked(self): self._open_workfile(folder_id, task_id, path) def _on_current_open_requests(self): + # TODO validate if item under mouse is enabled + # - thi uses selected item, but that does not have to be the one + # under mouse self._on_workarea_open_clicked() def _on_duplicate_request(self): @@ -210,11 +213,18 @@ def _on_duplicate_request(self): result = self._exec_save_as_dialog() if result is None: return + folder_id = self._selected_folder_id + task_id = self._selected_task_id self._controller.duplicate_workfile( + folder_id, + task_id, filepath, + result["rootless_workdir"], result["workdir"], result["filename"], - artist_note=result["artist_note"] + version=result["version"], + comment=result["comment"], + description=result["description"] ) def _on_workarea_browse_clicked(self): @@ -259,10 +269,12 @@ def _on_workarea_save_clicked(self): self._controller.save_as_workfile( result["folder_id"], result["task_id"], + result["rootless_workdir"], result["workdir"], result["filename"], - result["template_key"], - artist_note=result["artist_note"] + version=result["version"], + comment=result["comment"], + description=result["description"] ) def _on_workarea_path_changed(self, event): @@ -314,8 +326,10 @@ def _on_published_save_clicked(self): result["task_id"], result["workdir"], result["filename"], - result["template_key"], - artist_note=result["artist_note"] + result["rootless_workdir"], + version=result["version"], + comment=result["comment"], + description=result["description"], ) def _on_save_as_request(self): diff --git a/client/ayon_core/tools/workfiles/widgets/files_widget_published.py b/client/ayon_core/tools/workfiles/widgets/files_widget_published.py index 07122046be..250204a7d7 100644 --- a/client/ayon_core/tools/workfiles/widgets/files_widget_published.py +++ b/client/ayon_core/tools/workfiles/widgets/files_widget_published.py @@ -1,3 +1,5 @@ +import os + import qtawesome from qtpy import QtWidgets, QtCore, QtGui @@ -205,24 +207,25 @@ def _fill_items(self): new_items.append(item) item.setColumnCount(self.columnCount()) item.setData(self._file_icon, QtCore.Qt.DecorationRole) - item.setData(file_item.filename, QtCore.Qt.DisplayRole) item.setData(repre_id, REPRE_ID_ROLE) - if file_item.exists: + if file_item.available: flags = QtCore.Qt.ItemIsEnabled | QtCore.Qt.ItemIsSelectable else: flags = QtCore.Qt.NoItemFlags - author = file_item.created_by + author = file_item.author user_item = user_items_by_name.get(author) if user_item is not None and user_item.full_name: author = user_item.full_name - item.setFlags(flags) + filename = os.path.basename(file_item.filepath) + item.setFlags(flags) + item.setData(filename, QtCore.Qt.DisplayRole) item.setData(file_item.filepath, FILEPATH_ROLE) item.setData(author, AUTHOR_ROLE) - item.setData(file_item.modified, DATE_MODIFIED_ROLE) + item.setData(file_item.file_modified, DATE_MODIFIED_ROLE) self._items_by_id[repre_id] = item diff --git a/client/ayon_core/tools/workfiles/widgets/files_widget_workarea.py b/client/ayon_core/tools/workfiles/widgets/files_widget_workarea.py index 7f76b6a8ab..47d4902812 100644 --- a/client/ayon_core/tools/workfiles/widgets/files_widget_workarea.py +++ b/client/ayon_core/tools/workfiles/widgets/files_widget_workarea.py @@ -1,3 +1,5 @@ +import os + import qtawesome from qtpy import QtWidgets, QtCore, QtGui @@ -10,8 +12,10 @@ FILENAME_ROLE = QtCore.Qt.UserRole + 1 FILEPATH_ROLE = QtCore.Qt.UserRole + 2 -AUTHOR_ROLE = QtCore.Qt.UserRole + 3 -DATE_MODIFIED_ROLE = QtCore.Qt.UserRole + 4 +ROOTLESS_PATH_ROLE = QtCore.Qt.UserRole + 3 +AUTHOR_ROLE = QtCore.Qt.UserRole + 4 +DATE_MODIFIED_ROLE = QtCore.Qt.UserRole + 5 +WORKFILE_ENTITY_ID_ROLE = QtCore.Qt.UserRole + 6 class WorkAreaFilesModel(QtGui.QStandardItemModel): @@ -198,7 +202,7 @@ def _fill_items_impl(self): items_to_remove = set(self._items_by_filename.keys()) new_items = [] for file_item in file_items: - filename = file_item.filename + filename = os.path.basename(file_item.filepath) if filename in self._items_by_filename: items_to_remove.discard(filename) item = self._items_by_filename[filename] @@ -206,23 +210,28 @@ def _fill_items_impl(self): item = QtGui.QStandardItem() new_items.append(item) item.setColumnCount(self.columnCount()) - item.setFlags( - QtCore.Qt.ItemIsEnabled | QtCore.Qt.ItemIsSelectable - ) item.setData(self._file_icon, QtCore.Qt.DecorationRole) - item.setData(file_item.filename, QtCore.Qt.DisplayRole) - item.setData(file_item.filename, FILENAME_ROLE) + item.setData(filename, QtCore.Qt.DisplayRole) + item.setData(filename, FILENAME_ROLE) + flags = QtCore.Qt.ItemIsSelectable + if file_item.available: + flags |= QtCore.Qt.ItemIsEnabled + item.setFlags(flags) updated_by = file_item.updated_by user_item = user_items_by_name.get(updated_by) if user_item is not None and user_item.full_name: updated_by = user_item.full_name + item.setData( + file_item.workfile_entity_id, WORKFILE_ENTITY_ID_ROLE + ) item.setData(file_item.filepath, FILEPATH_ROLE) + item.setData(file_item.rootless_path, ROOTLESS_PATH_ROLE) + item.setData(file_item.file_modified, DATE_MODIFIED_ROLE) item.setData(updated_by, AUTHOR_ROLE) - item.setData(file_item.modified, DATE_MODIFIED_ROLE) - self._items_by_filename[file_item.filename] = item + self._items_by_filename[filename] = item if new_items: root_item.appendRows(new_items) @@ -354,14 +363,18 @@ def set_text_filter(self, text_filter): def _get_selected_info(self): selection_model = self._view.selectionModel() - filepath = None - filename = None + workfile_entity_id = filename = rootless_path = filepath = None for index in selection_model.selectedIndexes(): filepath = index.data(FILEPATH_ROLE) + rootless_path = index.data(ROOTLESS_PATH_ROLE) filename = index.data(FILENAME_ROLE) + workfile_entity_id = index.data(WORKFILE_ENTITY_ID_ROLE) + return { "filepath": filepath, + "rootless_path": rootless_path, "filename": filename, + "workfile_entity_id": workfile_entity_id, } def get_selected_path(self): @@ -374,8 +387,12 @@ def get_selected_path(self): return self._get_selected_info()["filepath"] def _on_selection_change(self): - filepath = self.get_selected_path() - self._controller.set_selected_workfile_path(filepath) + info = self._get_selected_info() + self._controller.set_selected_workfile_path( + info["rootless_path"], + info["filepath"], + info["workfile_entity_id"], + ) def _on_mouse_double_click(self, event): if event.button() == QtCore.Qt.LeftButton: @@ -430,19 +447,25 @@ def _on_expected_selection_change(self, event): ) def _on_model_refresh(self): - if ( - not self._change_selection_on_refresh - or self._proxy_model.rowCount() < 1 - ): + if not self._change_selection_on_refresh: return # Find the row with latest date modified + indexes = [ + self._proxy_model.index(idx, 0) + for idx in range(self._proxy_model.rowCount()) + ] + filtered_indexes = [ + index + for index in indexes + if self._proxy_model.flags(index) & QtCore.Qt.ItemIsEnabled + ] + if not filtered_indexes: + return + latest_index = max( - ( - self._proxy_model.index(idx, 0) - for idx in range(self._proxy_model.rowCount()) - ), - key=lambda model_index: model_index.data(DATE_MODIFIED_ROLE) + filtered_indexes, + key=lambda model_index: model_index.data(DATE_MODIFIED_ROLE) or 0 ) # Select row of latest modified diff --git a/client/ayon_core/tools/workfiles/widgets/save_as_dialog.py b/client/ayon_core/tools/workfiles/widgets/save_as_dialog.py index bddff816fe..24d64319ca 100644 --- a/client/ayon_core/tools/workfiles/widgets/save_as_dialog.py +++ b/client/ayon_core/tools/workfiles/widgets/save_as_dialog.py @@ -108,6 +108,7 @@ def __init__(self, controller, parent): self._ext_value = None self._filename = None self._workdir = None + self._rootless_workdir = None self._result = None @@ -144,8 +145,8 @@ def __init__(self, controller, parent): version_layout.addWidget(last_version_check) # Artist note widget - artist_note_input = PlaceholderPlainTextEdit(inputs_widget) - artist_note_input.setPlaceholderText( + description_input = PlaceholderPlainTextEdit(inputs_widget) + description_input.setPlaceholderText( "Provide a note about this workfile.") # Preview widget @@ -166,7 +167,7 @@ def __init__(self, controller, parent): subversion_label = QtWidgets.QLabel("Subversion:", inputs_widget) extension_label = QtWidgets.QLabel("Extension:", inputs_widget) preview_label = QtWidgets.QLabel("Preview:", inputs_widget) - artist_note_label = QtWidgets.QLabel("Artist Note:", inputs_widget) + description_label = QtWidgets.QLabel("Artist Note:", inputs_widget) # Build inputs inputs_layout = QtWidgets.QGridLayout(inputs_widget) @@ -178,8 +179,8 @@ def __init__(self, controller, parent): inputs_layout.addWidget(extension_combobox, 2, 1) inputs_layout.addWidget(preview_label, 3, 0) inputs_layout.addWidget(preview_widget, 3, 1) - inputs_layout.addWidget(artist_note_label, 4, 0, 1, 2) - inputs_layout.addWidget(artist_note_input, 5, 0, 1, 2) + inputs_layout.addWidget(description_label, 4, 0, 1, 2) + inputs_layout.addWidget(description_input, 5, 0, 1, 2) # Build layout main_layout = QtWidgets.QVBoxLayout(self) @@ -214,13 +215,13 @@ def __init__(self, controller, parent): self._extension_combobox = extension_combobox self._subversion_input = subversion_input self._preview_widget = preview_widget - self._artist_note_input = artist_note_input + self._description_input = description_input self._version_label = version_label self._subversion_label = subversion_label self._extension_label = extension_label self._preview_label = preview_label - self._artist_note_label = artist_note_label + self._description_label = description_label # Post init setup @@ -255,6 +256,7 @@ def update_context(self): self._folder_id = folder_id self._task_id = task_id self._workdir = data["workdir"] + self._rootless_workdir = data["rootless_workdir"] self._comment_value = data["comment"] self._ext_value = data["ext"] self._template_key = data["template_key"] @@ -329,10 +331,13 @@ def _on_ok_pressed(self): self._result = { "filename": self._filename, "workdir": self._workdir, + "rootless_workdir": self._rootless_workdir, "folder_id": self._folder_id, "task_id": self._task_id, "template_key": self._template_key, - "artist_note": self._artist_note_input.toPlainText(), + "version": self._version_value, + "comment": self._comment_value, + "description": self._description_input.toPlainText(), } self.close() diff --git a/client/ayon_core/tools/workfiles/widgets/side_panel.py b/client/ayon_core/tools/workfiles/widgets/side_panel.py index 7ba60b5544..b1b91d9721 100644 --- a/client/ayon_core/tools/workfiles/widgets/side_panel.py +++ b/client/ayon_core/tools/workfiles/widgets/side_panel.py @@ -4,6 +4,8 @@ def file_size_to_string(file_size): + if not file_size: + return "N/A" size = 0 size_ending_mapping = { "KB": 1024 ** 1, @@ -43,44 +45,47 @@ def __init__(self, controller, parent): details_input = QtWidgets.QPlainTextEdit(self) details_input.setReadOnly(True) - artist_note_widget = QtWidgets.QWidget(self) - note_label = QtWidgets.QLabel("Artist note", artist_note_widget) - note_input = QtWidgets.QPlainTextEdit(artist_note_widget) - btn_note_save = QtWidgets.QPushButton("Save note", artist_note_widget) - - artist_note_layout = QtWidgets.QVBoxLayout(artist_note_widget) - artist_note_layout.setContentsMargins(0, 0, 0, 0) - artist_note_layout.addWidget(note_label, 0) - artist_note_layout.addWidget(note_input, 1) - artist_note_layout.addWidget( - btn_note_save, 0, alignment=QtCore.Qt.AlignRight + description_widget = QtWidgets.QWidget(self) + description_label = QtWidgets.QLabel("Artist note", description_widget) + description_input = QtWidgets.QPlainTextEdit(description_widget) + btn_description_save = QtWidgets.QPushButton( + "Save note", description_widget + ) + + description_layout = QtWidgets.QVBoxLayout(description_widget) + description_layout.setContentsMargins(0, 0, 0, 0) + description_layout.addWidget(description_label, 0) + description_layout.addWidget(description_input, 1) + description_layout.addWidget( + btn_description_save, 0, alignment=QtCore.Qt.AlignRight ) main_layout = QtWidgets.QVBoxLayout(self) main_layout.setContentsMargins(0, 0, 0, 0) main_layout.addWidget(details_label, 0) main_layout.addWidget(details_input, 1) - main_layout.addWidget(artist_note_widget, 1) + main_layout.addWidget(description_widget, 1) - note_input.textChanged.connect(self._on_note_change) - btn_note_save.clicked.connect(self._on_save_click) + description_input.textChanged.connect(self._on_description_change) + btn_description_save.clicked.connect(self._on_save_click) controller.register_event_callback( "selection.workarea.changed", self._on_selection_change ) self._details_input = details_input - self._artist_note_widget = artist_note_widget - self._note_input = note_input - self._btn_note_save = btn_note_save + self._description_widget = description_widget + self._description_input = description_input + self._btn_description_save = btn_description_save self._folder_id = None - self._task_name = None + self._task_id = None self._filepath = None - self._orig_note = "" + self._rootless_path = None + self._orig_description = "" self._controller = controller - self._set_context(None, None, None) + self._set_context(None, None, None, None) def set_published_mode(self, published_mode): """Change published mode. @@ -89,64 +94,69 @@ def set_published_mode(self, published_mode): published_mode (bool): Published mode enabled. """ - self._artist_note_widget.setVisible(not published_mode) + self._description_widget.setVisible(not published_mode) def _on_selection_change(self, event): folder_id = event["folder_id"] - task_name = event["task_name"] + task_id = event["task_id"] filepath = event["path"] + rootless_path = event["rootless_path"] - self._set_context(folder_id, task_name, filepath) + self._set_context(folder_id, task_id, rootless_path, filepath) - def _on_note_change(self): - text = self._note_input.toPlainText() - self._btn_note_save.setEnabled(self._orig_note != text) + def _on_description_change(self): + text = self._description_input.toPlainText() + self._btn_description_save.setEnabled(self._orig_description != text) def _on_save_click(self): - note = self._note_input.toPlainText() + description = self._description_input.toPlainText() self._controller.save_workfile_info( - self._folder_id, - self._task_name, - self._filepath, - note + self._task_id, + self._rootless_path, + description=description, ) - self._orig_note = note - self._btn_note_save.setEnabled(False) + self._orig_description = description + self._btn_description_save.setEnabled(False) - def _set_context(self, folder_id, task_name, filepath): + def _set_context(self, folder_id, task_id, rootless_path, filepath): workfile_info = None # Check if folder, task and file are selected - if bool(folder_id) and bool(task_name) and bool(filepath): + if folder_id and task_id and rootless_path: workfile_info = self._controller.get_workfile_info( - folder_id, task_name, filepath + folder_id, task_id, rootless_path ) enabled = workfile_info is not None self._details_input.setEnabled(enabled) - self._note_input.setEnabled(enabled) - self._btn_note_save.setEnabled(enabled) + self._description_input.setEnabled(enabled) + self._btn_description_save.setEnabled(enabled) self._folder_id = folder_id - self._task_name = task_name + self._task_id = task_id self._filepath = filepath + self._rootless_path = rootless_path # Disable inputs and remove texts if any required arguments are # missing if not enabled: - self._orig_note = "" + self._orig_description = "" self._details_input.setPlainText("") - self._note_input.setPlainText("") + self._description_input.setPlainText("") return - note = workfile_info.note - size_value = file_size_to_string(workfile_info.filesize) + description = workfile_info.description + size_value = file_size_to_string(workfile_info.file_size) # Append html string datetime_format = "%b %d %Y %H:%M:%S" - creation_time = datetime.datetime.fromtimestamp( - workfile_info.creation_time) - modification_time = datetime.datetime.fromtimestamp( - workfile_info.modification_time) + file_created = workfile_info.file_created + modification_time = workfile_info.file_modified + if file_created: + file_created = datetime.datetime.fromtimestamp(file_created) + + if modification_time: + modification_time = datetime.datetime.fromtimestamp( + modification_time) user_items_by_name = self._controller.get_user_items_by_name() @@ -156,33 +166,38 @@ def convert_username(username): return user_item.full_name return username - created_lines = [ - creation_time.strftime(datetime_format) - ] + created_lines = [] if workfile_info.created_by: - created_lines.insert( - 0, convert_username(workfile_info.created_by) + created_lines.append( + convert_username(workfile_info.created_by) ) + if file_created: + created_lines.append(file_created.strftime(datetime_format)) - modified_lines = [ - modification_time.strftime(datetime_format) - ] + if created_lines: + created_lines.insert(0, "Created:") + + modified_lines = [] if workfile_info.updated_by: - modified_lines.insert( - 0, convert_username(workfile_info.updated_by) + modified_lines.append( + convert_username(workfile_info.updated_by) + ) + if modification_time: + modified_lines.append( + modification_time.strftime(datetime_format) ) + if modified_lines: + modified_lines.insert(0, "Modified:") lines = ( "Size:", size_value, - "Created:", "
".join(created_lines), - "Modified:", "
".join(modified_lines), ) - self._orig_note = note - self._note_input.setPlainText(note) + self._orig_description = description + self._description_input.setPlainText(description) # Set as empty string self._details_input.setPlainText("") - self._details_input.appendHtml("
".join(lines)) + self._details_input.appendHtml("
".join(lines))