diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index 0cd6bd0874b..79d45ce44d9 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -316,3 +316,13 @@ class WalletCreditsLimitReachedMessage(RabbitMessageBase): def routing_key(self) -> str | None: return f"{self.wallet_id}.{self.credits_limit}" + + +class FileDeletedMessage(RabbitMessageBase): + channel_name: Literal[ + "io.simcore.service.storage.file-deleted" + ] = "io.simcore.service.storage.file-deleted" + file_id: str + + def routing_key(self) -> str | None: + return None diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 1fae14e81d3..0b47c3febd9 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -1129,6 +1129,11 @@ services: POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} POSTGRES_PORT: ${POSTGRES_PORT} POSTGRES_USER: ${POSTGRES_USER} + RABBIT_SECURE: ${RABBIT_SECURE} + RABBIT_HOST: ${RABBIT_HOST} + RABBIT_PORT: ${RABBIT_PORT} + RABBIT_USER: ${RABBIT_USER} + RABBIT_PASSWORD: ${RABBIT_PASSWORD} REDIS_HOST: ${REDIS_HOST} REDIS_PORT: ${REDIS_PORT} REDIS_SECURE: ${REDIS_SECURE} diff --git a/services/storage/src/simcore_service_storage/application.py b/services/storage/src/simcore_service_storage/application.py index 16aa8f837eb..4df263b93ac 100644 --- a/services/storage/src/simcore_service_storage/application.py +++ b/services/storage/src/simcore_service_storage/application.py @@ -20,6 +20,7 @@ from .dsm import setup_dsm from .dsm_cleaner import setup_dsm_cleaner from .long_running_tasks import setup_rest_api_long_running_tasks +from .rabbitmq import setup_rabbitmq from .redis import setup_redis from .rest import setup_rest from .s3 import setup_s3 @@ -58,6 +59,7 @@ def create(settings: Settings) -> web.Application: service_name=APP_NAME, ) + setup_rabbitmq(app) setup_db(app) setup_s3(app) diff --git a/services/storage/src/simcore_service_storage/rabbitmq.py b/services/storage/src/simcore_service_storage/rabbitmq.py new file mode 100644 index 00000000000..247581f169e --- /dev/null +++ b/services/storage/src/simcore_service_storage/rabbitmq.py @@ -0,0 +1,34 @@ +from typing import cast + +from aiohttp import web +from servicelib.rabbitmq._client import RabbitMQClient +from servicelib.rabbitmq._utils import wait_till_rabbitmq_responsive + +from .constants import APP_CONFIG_KEY +from .settings import Settings + +_APP_RABBITMQ_CLIENT_KEY = "APP_RABBITMQ_CLIENT_KEY" + + +async def _rabbitmq_client(app: web.Application): + app[_APP_RABBITMQ_CLIENT_KEY] = None + settings: Settings = app[APP_CONFIG_KEY] + assert settings.STORAGE_RABBITMQ # nosec + rabbitmq_settings = settings.STORAGE_RABBITMQ + + await wait_till_rabbitmq_responsive(f"{rabbitmq_settings.dsn}") + + app[_APP_RABBITMQ_CLIENT_KEY] = RabbitMQClient("storage", rabbitmq_settings) + + yield + + await app[_APP_RABBITMQ_CLIENT_KEY].close() + + +def setup_rabbitmq(app: web.Application): + if _rabbitmq_client not in app.cleanup_ctx: + app.cleanup_ctx.append(_rabbitmq_client) + + +def get_rabbitmq_client(app: web.Application) -> RabbitMQClient: + return cast(RabbitMQClient, app[_APP_RABBITMQ_CLIENT_KEY]) diff --git a/services/storage/src/simcore_service_storage/settings.py b/services/storage/src/simcore_service_storage/settings.py index 75d25311fcd..f5262e99a96 100644 --- a/services/storage/src/simcore_service_storage/settings.py +++ b/services/storage/src/simcore_service_storage/settings.py @@ -12,6 +12,7 @@ from settings_library.base import BaseCustomSettings from settings_library.basic_types import LogLevel, PortInt from settings_library.postgres import PostgresSettings +from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings from settings_library.s3 import S3Settings from settings_library.tracing import TracingSettings @@ -46,6 +47,10 @@ class Settings(BaseCustomSettings, MixinLoggingSettings): None, description="Pennsieve API secret ONLY for testing purposes" ) + STORAGE_RABBITMQ: RabbitSettings = Field( + json_schema_extra={"auto_default_from_env": True} + ) + STORAGE_POSTGRES: PostgresSettings = Field( json_schema_extra={"auto_default_from_env": True} ) diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index d41630b5230..acccaa12258 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -35,6 +35,7 @@ SimcoreS3FileID, StorageFileID, ) +from models_library.rabbitmq_messages import FileDeletedMessage from models_library.users import UserID from pydantic import AnyUrl, ByteSize, NonNegativeInt, TypeAdapter from servicelib.aiohttp.client_session import get_client_session @@ -77,6 +78,7 @@ UploadLinks, UserOrProjectFilter, ) +from .rabbitmq import get_rabbitmq_client from .s3 import get_s3_client from .s3_utils import S3TransferDataCB, update_task_progress from .settings import Settings @@ -537,6 +539,12 @@ async def delete_file( _logger.warning("File %s not found in S3", file_id) # we still need to clean up the database entry (it exists) # and to invalidate the size of the parent directory + else: + rabbit_client = get_rabbitmq_client(self.app) + msg = FileDeletedMessage( + file_id=file_id, + ) + await rabbit_client.publish(msg.channel_name, msg) async with self.engine.acquire() as conn: await db_file_meta_data.delete(conn, [file_id]) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py index 5d4b5578a5d..c577d8fce02 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py @@ -3,7 +3,10 @@ from typing import Final from aiohttp import web -from models_library.rabbitmq_messages import InstrumentationRabbitMessage +from models_library.rabbitmq_messages import ( + FileDeletedMessage, + InstrumentationRabbitMessage, +) from servicelib.aiohttp.monitor_services import ( MONITOR_SERVICE_STARTED_LABELS, MONITOR_SERVICE_STOPPED_LABELS, @@ -14,6 +17,7 @@ from servicelib.rabbitmq import RabbitMQClient from servicelib.utils import logged_gather +from ..projects import projects_service from ..rabbitmq import get_rabbitmq_client from ._rabbitmq_consumers_common import SubcribeArgumentsTuple, subscribe_to_rabbitmq @@ -43,12 +47,26 @@ async def _instrumentation_message_parser(app: web.Application, data: bytes) -> return True +async def _file_deleted_message_parser(app: web.Application, data: bytes) -> bool: + rabbit_message = FileDeletedMessage.model_validate_json(data) + _logger.error("File %s deleted", rabbit_message.file_id) + + projects_service.on_file_deleted(app, rabbit_message.file_id) + + return True + + _EXCHANGE_TO_PARSER_CONFIG: Final[tuple[SubcribeArgumentsTuple, ...,]] = ( SubcribeArgumentsTuple( InstrumentationRabbitMessage.get_channel_name(), _instrumentation_message_parser, {"exclusive_queue": False}, ), + SubcribeArgumentsTuple( + FileDeletedMessage.get_channel_name(), + _file_deleted_message_parser, + {"exclusive_queue": False}, + ), ) diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_db.py b/services/web/server/src/simcore_service_webserver/projects/_projects_db.py index 92c73867f77..a38abd2deb3 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_db.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_db.py @@ -16,7 +16,7 @@ # NOTE: MD: I intentionally didn't include the workbench. There is a special interface # for the workbench, and at some point, this column should be removed from the table. -# The same holds true for access_rights/ui/classifiers/quality, but we have decided to proceed step by step. +# The same holds true for ui/classifiers/quality, but we have decided to proceed step by step. _SELECTION_PROJECT_DB_ARGS = [ # noqa: RUF012 projects.c.id, projects.c.type, diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py b/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py index cd6880732e0..9626ff79d06 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py @@ -1,7 +1,6 @@ import logging import sqlalchemy as sa - from aiohttp import web from models_library.projects import ProjectID from models_library.projects_nodes import Node, PartialNode @@ -10,8 +9,8 @@ from simcore_postgres_database.webserver_models import projects_nodes from sqlalchemy.ext.asyncio import AsyncConnection -from .exceptions import NodeNotFoundError from ..db.plugin import get_asyncpg_engine +from .exceptions import NodeNotFoundError _logger = logging.getLogger(__name__) @@ -36,7 +35,7 @@ ] -async def get( +async def get_node( app: web.Application, connection: AsyncConnection | None = None, *, @@ -44,27 +43,53 @@ async def get( node_id: NodeID, ) -> Node: async with transaction_context(get_asyncpg_engine(app), connection) as conn: - get_stmt = sa.select( - *_SELECTION_PROJECTS_NODES_DB_ARGS - ).where( - (projects_nodes.c.project_uuid == f"{project_id}") - & (projects_nodes.c.node_id == f"{node_id}") + result = await conn.stream( + sa.select(*_SELECTION_PROJECTS_NODES_DB_ARGS).where( + (projects_nodes.c.project_uuid == f"{project_id}") + & (projects_nodes.c.node_id == f"{node_id}") + ) ) - - result = await conn.stream(get_stmt) assert result # nosec row = await result.first() if row is None: raise NodeNotFoundError( - project_uuid=f"{project_id}", - node_uuid=f"{node_id}" + project_uuid=f"{project_id}", node_uuid=f"{node_id}" ) assert row # nosec return Node.model_validate(row, from_attributes=True) -async def update( +async def delete_nodes_outputs_with_file_id( + app: web.Application, + connection: AsyncConnection | None = None, + *, + file_id: str, +) -> None: + async with transaction_context(get_asyncpg_engine(app), connection) as conn: + await conn.execute( + sa.update(projects_nodes) + .values( + # Remove keys where the value contains the matching path + outputs=sa.func.jsonb_strip_nulls( + sa.func.jsonb_object_agg( + sa.func.jsonb_each(projects_nodes.c.outputs).key, + sa.func.nullif( + sa.func.jsonb_each(projects_nodes.c.outputs).value, + sa.func.jsonb_build_object("path", file_id), + ), + ) + ) + ) + .where( + sa.func.jsonb_each(projects_nodes.c.outputs).value.contains( + {"path": file_id} + ) + ) + ) + + +async def update_node( app: web.Application, connection: AsyncConnection | None = None, *, diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_service.py b/services/web/server/src/simcore_service_webserver/projects/projects_service.py index e34c784da73..dac48bb85ef 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_service.py @@ -985,7 +985,7 @@ async def update_project_node_state( new_node_data={"state": {"currentStatus": new_state}}, ) - await _projects_nodes_repository.update( + await _projects_nodes_repository.update_node( app, project_id=project_id, node_id=node_id, @@ -1057,7 +1057,7 @@ async def patch_project_node( new_node_data=_node_patch_exclude_unset, ) - await _projects_nodes_repository.update( + await _projects_nodes_repository.update_node( app, project_id=project_id, node_id=node_id, @@ -1125,7 +1125,7 @@ async def update_project_node_outputs( new_node_data={"outputs": new_outputs, "runHash": new_run_hash}, ) - await _projects_nodes_repository.update( + await _projects_nodes_repository.update_node( app, project_id=project_id, node_id=node_id, @@ -1919,3 +1919,9 @@ async def get_project_inactivity( project_settings.PROJECTS_INACTIVITY_INTERVAL.total_seconds() ), ) + + +async def on_file_deleted(app: web.Application, file_id: str): + await _projects_nodes_repository.delete_nodes_outputs_with_file_id( + app, file_id=file_id + )