From a828994e177839c80cf544982e42712de5ebdf3f Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 20 Dec 2024 11:31:34 +0100 Subject: [PATCH 01/10] update comment --- .../src/simcore_service_webserver/projects/_projects_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_db.py b/services/web/server/src/simcore_service_webserver/projects/_projects_db.py index 3c94e9e7cdc..c36ccf26ccf 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_db.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_db.py @@ -16,7 +16,7 @@ # NOTE: MD: I intentionally didn't include the workbench. There is a special interface # for the workbench, and at some point, this column should be removed from the table. -# The same holds true for access_rights/ui/classifiers/quality, but we have decided to proceed step by step. +# The same holds true for ui/classifiers/quality, but we have decided to proceed step by step. _SELECTION_PROJECT_DB_ARGS = [ # noqa: RUF012 projects.c.id, projects.c.type, From 639a561db605f66509661509a77f9c6fa9cb5ff0 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Mon, 6 Jan 2025 13:22:38 +0100 Subject: [PATCH 02/10] add ref_count column --- .../a80132007436_add_file_ref_count.py | 32 +++++++++++++++++++ .../models/file_meta_data.py | 7 ++++ 2 files changed, 39 insertions(+) create mode 100644 packages/postgres-database/src/simcore_postgres_database/migration/versions/a80132007436_add_file_ref_count.py diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/a80132007436_add_file_ref_count.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/a80132007436_add_file_ref_count.py new file mode 100644 index 00000000000..63da84cc6f5 --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/a80132007436_add_file_ref_count.py @@ -0,0 +1,32 @@ +"""Add file ref_count + +Revision ID: a80132007436 +Revises: 1e3c9c804fec +Create Date: 2025-01-06 12:17:49.935022+00:00 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "a80132007436" +down_revision = "1e3c9c804fec" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "file_meta_data", + sa.Column( + "ref_count", sa.Integer(), server_default=sa.text("0"), nullable=False + ), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("file_meta_data", "ref_count") + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/file_meta_data.py b/packages/postgres-database/src/simcore_postgres_database/models/file_meta_data.py index 9ece039863f..738cc19b9cd 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/file_meta_data.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/file_meta_data.py @@ -56,4 +56,11 @@ doc="SHA256 checksum of the file content", index=True, ), + sa.Column( + "ref_count", + sa.Integer(), + nullable=False, + server_default=sa.text("0"), + doc="Number of references to the file", + ), ) From 84782af4201e3bf2b440e89378ad34c7195bbd27 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Mon, 6 Jan 2025 13:23:50 +0100 Subject: [PATCH 03/10] add ref_count field --- services/storage/src/simcore_service_storage/models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/storage/src/simcore_service_storage/models.py b/services/storage/src/simcore_service_storage/models.py index 672694b4fc7..eb98de1c013 100644 --- a/services/storage/src/simcore_service_storage/models.py +++ b/services/storage/src/simcore_service_storage/models.py @@ -73,6 +73,7 @@ class FileMetaDataAtDB(BaseModel): upload_expires_at: datetime.datetime | None = None is_directory: bool sha256_checksum: SHA256Str | None = None + ref_count: int = 0 model_config = ConfigDict(from_attributes=True, extra="forbid") From d206013842330446580d653035bc7b2969a8a167 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 22 Jan 2025 09:53:33 +0100 Subject: [PATCH 04/10] upgrade script --- ...f_count.py => 5cb5fbf6340d_add_file_ref_count.py} | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) rename packages/postgres-database/src/simcore_postgres_database/migration/versions/{a80132007436_add_file_ref_count.py => 5cb5fbf6340d_add_file_ref_count.py} (77%) diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/a80132007436_add_file_ref_count.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/5cb5fbf6340d_add_file_ref_count.py similarity index 77% rename from packages/postgres-database/src/simcore_postgres_database/migration/versions/a80132007436_add_file_ref_count.py rename to packages/postgres-database/src/simcore_postgres_database/migration/versions/5cb5fbf6340d_add_file_ref_count.py index 63da84cc6f5..4aa23d91d36 100644 --- a/packages/postgres-database/src/simcore_postgres_database/migration/versions/a80132007436_add_file_ref_count.py +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/5cb5fbf6340d_add_file_ref_count.py @@ -1,16 +1,16 @@ -"""Add file ref_count +"""add file ref_count -Revision ID: a80132007436 -Revises: 1e3c9c804fec -Create Date: 2025-01-06 12:17:49.935022+00:00 +Revision ID: 5cb5fbf6340d +Revises: ecd4eadaa781 +Create Date: 2025-01-22 08:52:59.196359+00:00 """ import sqlalchemy as sa from alembic import op # revision identifiers, used by Alembic. -revision = "a80132007436" -down_revision = "1e3c9c804fec" +revision = "5cb5fbf6340d" +down_revision = "ecd4eadaa781" branch_labels = None depends_on = None From efb57abee86a75e9249bab3ca5cb63cf5a5c17e1 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 22 Jan 2025 16:41:19 +0100 Subject: [PATCH 05/10] add rabbitmq client setup --- services/docker-compose.yml | 5 ++++ .../simcore_service_storage/application.py | 2 ++ .../src/simcore_service_storage/rabbitmq.py | 28 +++++++++++++++++++ .../src/simcore_service_storage/settings.py | 5 ++++ 4 files changed, 40 insertions(+) create mode 100644 services/storage/src/simcore_service_storage/rabbitmq.py diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 0f589739ad3..a25749be35d 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -1127,6 +1127,11 @@ services: POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} POSTGRES_PORT: ${POSTGRES_PORT} POSTGRES_USER: ${POSTGRES_USER} + RABBIT_SECURE: ${RABBIT_SECURE} + RABBIT_HOST: ${RABBIT_HOST} + RABBIT_PORT: ${RABBIT_PORT} + RABBIT_USER: ${RABBIT_USER} + RABBIT_PASSWORD: ${RABBIT_PASSWORD} REDIS_HOST: ${REDIS_HOST} REDIS_PORT: ${REDIS_PORT} REDIS_SECURE: ${REDIS_SECURE} diff --git a/services/storage/src/simcore_service_storage/application.py b/services/storage/src/simcore_service_storage/application.py index 16aa8f837eb..4df263b93ac 100644 --- a/services/storage/src/simcore_service_storage/application.py +++ b/services/storage/src/simcore_service_storage/application.py @@ -20,6 +20,7 @@ from .dsm import setup_dsm from .dsm_cleaner import setup_dsm_cleaner from .long_running_tasks import setup_rest_api_long_running_tasks +from .rabbitmq import setup_rabbitmq from .redis import setup_redis from .rest import setup_rest from .s3 import setup_s3 @@ -58,6 +59,7 @@ def create(settings: Settings) -> web.Application: service_name=APP_NAME, ) + setup_rabbitmq(app) setup_db(app) setup_s3(app) diff --git a/services/storage/src/simcore_service_storage/rabbitmq.py b/services/storage/src/simcore_service_storage/rabbitmq.py new file mode 100644 index 00000000000..68d6373c289 --- /dev/null +++ b/services/storage/src/simcore_service_storage/rabbitmq.py @@ -0,0 +1,28 @@ +from aiohttp import web +from servicelib.rabbitmq._client import RabbitMQClient +from servicelib.rabbitmq._utils import wait_till_rabbitmq_responsive + +from .constants import APP_CONFIG_KEY +from .settings import Settings + +_APP_RABBITMQ_CLIENT_KEY = "APP_RABBITMQ_CLIENT_KEY" + + +async def _rabbitmq_client(app: web.Application): + app[_APP_RABBITMQ_CLIENT_KEY] = None + settings: Settings = app[APP_CONFIG_KEY] + assert settings.STORAGE_RABBITMQ # nosec + rabbitmq_settings = settings.STORAGE_RABBITMQ + + await wait_till_rabbitmq_responsive(f"{rabbitmq_settings.dsn}") + + app[_APP_RABBITMQ_CLIENT_KEY] = RabbitMQClient("storage", rabbitmq_settings) + + yield + + await app[_APP_RABBITMQ_CLIENT_KEY].close() + + +def setup_rabbitmq(app: web.Application): + if _rabbitmq_client not in app.cleanup_ctx: + app.cleanup_ctx.append(_rabbitmq_client) diff --git a/services/storage/src/simcore_service_storage/settings.py b/services/storage/src/simcore_service_storage/settings.py index 75d25311fcd..f5262e99a96 100644 --- a/services/storage/src/simcore_service_storage/settings.py +++ b/services/storage/src/simcore_service_storage/settings.py @@ -12,6 +12,7 @@ from settings_library.base import BaseCustomSettings from settings_library.basic_types import LogLevel, PortInt from settings_library.postgres import PostgresSettings +from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings from settings_library.s3 import S3Settings from settings_library.tracing import TracingSettings @@ -46,6 +47,10 @@ class Settings(BaseCustomSettings, MixinLoggingSettings): None, description="Pennsieve API secret ONLY for testing purposes" ) + STORAGE_RABBITMQ: RabbitSettings = Field( + json_schema_extra={"auto_default_from_env": True} + ) + STORAGE_POSTGRES: PostgresSettings = Field( json_schema_extra={"auto_default_from_env": True} ) From 2bf9eb634fb6058caa914c9e0c16a279a10f2d8a Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 24 Jan 2025 12:07:53 +0100 Subject: [PATCH 06/10] add event when deleting file --- .../src/models_library/rabbitmq_messages.py | 10 ++++++++++ .../src/simcore_service_storage/rabbitmq.py | 6 ++++++ .../simcore_service_storage/simcore_s3_dsm.py | 8 ++++++++ .../_rabbitmq_nonexclusive_queue_consumers.py | 17 ++++++++++++++++- 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index 0cd6bd0874b..79d45ce44d9 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -316,3 +316,13 @@ class WalletCreditsLimitReachedMessage(RabbitMessageBase): def routing_key(self) -> str | None: return f"{self.wallet_id}.{self.credits_limit}" + + +class FileDeletedMessage(RabbitMessageBase): + channel_name: Literal[ + "io.simcore.service.storage.file-deleted" + ] = "io.simcore.service.storage.file-deleted" + file_id: str + + def routing_key(self) -> str | None: + return None diff --git a/services/storage/src/simcore_service_storage/rabbitmq.py b/services/storage/src/simcore_service_storage/rabbitmq.py index 68d6373c289..247581f169e 100644 --- a/services/storage/src/simcore_service_storage/rabbitmq.py +++ b/services/storage/src/simcore_service_storage/rabbitmq.py @@ -1,3 +1,5 @@ +from typing import cast + from aiohttp import web from servicelib.rabbitmq._client import RabbitMQClient from servicelib.rabbitmq._utils import wait_till_rabbitmq_responsive @@ -26,3 +28,7 @@ async def _rabbitmq_client(app: web.Application): def setup_rabbitmq(app: web.Application): if _rabbitmq_client not in app.cleanup_ctx: app.cleanup_ctx.append(_rabbitmq_client) + + +def get_rabbitmq_client(app: web.Application) -> RabbitMQClient: + return cast(RabbitMQClient, app[_APP_RABBITMQ_CLIENT_KEY]) diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index d41630b5230..acccaa12258 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -35,6 +35,7 @@ SimcoreS3FileID, StorageFileID, ) +from models_library.rabbitmq_messages import FileDeletedMessage from models_library.users import UserID from pydantic import AnyUrl, ByteSize, NonNegativeInt, TypeAdapter from servicelib.aiohttp.client_session import get_client_session @@ -77,6 +78,7 @@ UploadLinks, UserOrProjectFilter, ) +from .rabbitmq import get_rabbitmq_client from .s3 import get_s3_client from .s3_utils import S3TransferDataCB, update_task_progress from .settings import Settings @@ -537,6 +539,12 @@ async def delete_file( _logger.warning("File %s not found in S3", file_id) # we still need to clean up the database entry (it exists) # and to invalidate the size of the parent directory + else: + rabbit_client = get_rabbitmq_client(self.app) + msg = FileDeletedMessage( + file_id=file_id, + ) + await rabbit_client.publish(msg.channel_name, msg) async with self.engine.acquire() as conn: await db_file_meta_data.delete(conn, [file_id]) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py index 5d4b5578a5d..6942bb62f27 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py @@ -3,7 +3,10 @@ from typing import Final from aiohttp import web -from models_library.rabbitmq_messages import InstrumentationRabbitMessage +from models_library.rabbitmq_messages import ( + FileDeletedMessage, + InstrumentationRabbitMessage, +) from servicelib.aiohttp.monitor_services import ( MONITOR_SERVICE_STARTED_LABELS, MONITOR_SERVICE_STOPPED_LABELS, @@ -43,12 +46,24 @@ async def _instrumentation_message_parser(app: web.Application, data: bytes) -> return True +async def _file_deleted_message_parser(app: web.Application, data: bytes) -> bool: + rabbit_message = FileDeletedMessage.model_validate_json(data) + _logger.error("File %s deleted", rabbit_message.file_id) + + return True + + _EXCHANGE_TO_PARSER_CONFIG: Final[tuple[SubcribeArgumentsTuple, ...,]] = ( SubcribeArgumentsTuple( InstrumentationRabbitMessage.get_channel_name(), _instrumentation_message_parser, {"exclusive_queue": False}, ), + SubcribeArgumentsTuple( + FileDeletedMessage.get_channel_name(), + _file_deleted_message_parser, + {"exclusive_queue": False}, + ), ) From 31e13272954911491879545a79c76820b5e273cc Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 28 Jan 2025 09:33:09 +0100 Subject: [PATCH 07/10] remove ref_count --- .../5cb5fbf6340d_add_file_ref_count.py | 32 ------------------- .../models/file_meta_data.py | 7 ---- 2 files changed, 39 deletions(-) delete mode 100644 packages/postgres-database/src/simcore_postgres_database/migration/versions/5cb5fbf6340d_add_file_ref_count.py diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/5cb5fbf6340d_add_file_ref_count.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/5cb5fbf6340d_add_file_ref_count.py deleted file mode 100644 index 4aa23d91d36..00000000000 --- a/packages/postgres-database/src/simcore_postgres_database/migration/versions/5cb5fbf6340d_add_file_ref_count.py +++ /dev/null @@ -1,32 +0,0 @@ -"""add file ref_count - -Revision ID: 5cb5fbf6340d -Revises: ecd4eadaa781 -Create Date: 2025-01-22 08:52:59.196359+00:00 - -""" -import sqlalchemy as sa -from alembic import op - -# revision identifiers, used by Alembic. -revision = "5cb5fbf6340d" -down_revision = "ecd4eadaa781" -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.add_column( - "file_meta_data", - sa.Column( - "ref_count", sa.Integer(), server_default=sa.text("0"), nullable=False - ), - ) - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_column("file_meta_data", "ref_count") - # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/file_meta_data.py b/packages/postgres-database/src/simcore_postgres_database/models/file_meta_data.py index 738cc19b9cd..9ece039863f 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/file_meta_data.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/file_meta_data.py @@ -56,11 +56,4 @@ doc="SHA256 checksum of the file content", index=True, ), - sa.Column( - "ref_count", - sa.Integer(), - nullable=False, - server_default=sa.text("0"), - doc="Number of references to the file", - ), ) From 1356f11447a1c4e5cf6c5d3423bd4e2417d72f9b Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 28 Jan 2025 09:33:41 +0100 Subject: [PATCH 08/10] remove ref_count --- services/storage/src/simcore_service_storage/models.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/storage/src/simcore_service_storage/models.py b/services/storage/src/simcore_service_storage/models.py index eb98de1c013..672694b4fc7 100644 --- a/services/storage/src/simcore_service_storage/models.py +++ b/services/storage/src/simcore_service_storage/models.py @@ -73,7 +73,6 @@ class FileMetaDataAtDB(BaseModel): upload_expires_at: datetime.datetime | None = None is_directory: bool sha256_checksum: SHA256Str | None = None - ref_count: int = 0 model_config = ConfigDict(from_attributes=True, extra="forbid") From ec4379aba4b367190e5b3930610ee3c98b1fe31e Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 28 Jan 2025 09:56:22 +0100 Subject: [PATCH 09/10] rename --- .../projects/_projects_nodes_repository.py | 14 +++++--------- .../projects/projects_service.py | 6 +++--- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py b/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py index cd6880732e0..369f7789190 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py @@ -1,7 +1,6 @@ import logging import sqlalchemy as sa - from aiohttp import web from models_library.projects import ProjectID from models_library.projects_nodes import Node, PartialNode @@ -10,8 +9,8 @@ from simcore_postgres_database.webserver_models import projects_nodes from sqlalchemy.ext.asyncio import AsyncConnection -from .exceptions import NodeNotFoundError from ..db.plugin import get_asyncpg_engine +from .exceptions import NodeNotFoundError _logger = logging.getLogger(__name__) @@ -36,7 +35,7 @@ ] -async def get( +async def get_node( app: web.Application, connection: AsyncConnection | None = None, *, @@ -44,9 +43,7 @@ async def get( node_id: NodeID, ) -> Node: async with transaction_context(get_asyncpg_engine(app), connection) as conn: - get_stmt = sa.select( - *_SELECTION_PROJECTS_NODES_DB_ARGS - ).where( + get_stmt = sa.select(*_SELECTION_PROJECTS_NODES_DB_ARGS).where( (projects_nodes.c.project_uuid == f"{project_id}") & (projects_nodes.c.node_id == f"{node_id}") ) @@ -57,14 +54,13 @@ async def get( row = await result.first() if row is None: raise NodeNotFoundError( - project_uuid=f"{project_id}", - node_uuid=f"{node_id}" + project_uuid=f"{project_id}", node_uuid=f"{node_id}" ) assert row # nosec return Node.model_validate(row, from_attributes=True) -async def update( +async def update_node( app: web.Application, connection: AsyncConnection | None = None, *, diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_service.py b/services/web/server/src/simcore_service_webserver/projects/projects_service.py index e34c784da73..508f153c4ac 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_service.py @@ -985,7 +985,7 @@ async def update_project_node_state( new_node_data={"state": {"currentStatus": new_state}}, ) - await _projects_nodes_repository.update( + await _projects_nodes_repository.update_node( app, project_id=project_id, node_id=node_id, @@ -1057,7 +1057,7 @@ async def patch_project_node( new_node_data=_node_patch_exclude_unset, ) - await _projects_nodes_repository.update( + await _projects_nodes_repository.update_node( app, project_id=project_id, node_id=node_id, @@ -1125,7 +1125,7 @@ async def update_project_node_outputs( new_node_data={"outputs": new_outputs, "runHash": new_run_hash}, ) - await _projects_nodes_repository.update( + await _projects_nodes_repository.update_node( app, project_id=project_id, node_id=node_id, From 04ceb61f849ed1764f96fd673c66660866a8461a Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 28 Jan 2025 15:42:44 +0100 Subject: [PATCH 10/10] add nodes outputs update --- .../_rabbitmq_nonexclusive_queue_consumers.py | 3 ++ .../projects/_projects_nodes_repository.py | 39 ++++++++++++++++--- .../projects/projects_service.py | 6 +++ 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py index 6942bb62f27..c577d8fce02 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py @@ -17,6 +17,7 @@ from servicelib.rabbitmq import RabbitMQClient from servicelib.utils import logged_gather +from ..projects import projects_service from ..rabbitmq import get_rabbitmq_client from ._rabbitmq_consumers_common import SubcribeArgumentsTuple, subscribe_to_rabbitmq @@ -50,6 +51,8 @@ async def _file_deleted_message_parser(app: web.Application, data: bytes) -> boo rabbit_message = FileDeletedMessage.model_validate_json(data) _logger.error("File %s deleted", rabbit_message.file_id) + projects_service.on_file_deleted(app, rabbit_message.file_id) + return True diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py b/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py index 369f7789190..9626ff79d06 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py @@ -43,12 +43,12 @@ async def get_node( node_id: NodeID, ) -> Node: async with transaction_context(get_asyncpg_engine(app), connection) as conn: - get_stmt = sa.select(*_SELECTION_PROJECTS_NODES_DB_ARGS).where( - (projects_nodes.c.project_uuid == f"{project_id}") - & (projects_nodes.c.node_id == f"{node_id}") + result = await conn.stream( + sa.select(*_SELECTION_PROJECTS_NODES_DB_ARGS).where( + (projects_nodes.c.project_uuid == f"{project_id}") + & (projects_nodes.c.node_id == f"{node_id}") + ) ) - - result = await conn.stream(get_stmt) assert result # nosec row = await result.first() @@ -60,6 +60,35 @@ async def get_node( return Node.model_validate(row, from_attributes=True) +async def delete_nodes_outputs_with_file_id( + app: web.Application, + connection: AsyncConnection | None = None, + *, + file_id: str, +) -> None: + async with transaction_context(get_asyncpg_engine(app), connection) as conn: + await conn.execute( + sa.update(projects_nodes) + .values( + # Remove keys where the value contains the matching path + outputs=sa.func.jsonb_strip_nulls( + sa.func.jsonb_object_agg( + sa.func.jsonb_each(projects_nodes.c.outputs).key, + sa.func.nullif( + sa.func.jsonb_each(projects_nodes.c.outputs).value, + sa.func.jsonb_build_object("path", file_id), + ), + ) + ) + ) + .where( + sa.func.jsonb_each(projects_nodes.c.outputs).value.contains( + {"path": file_id} + ) + ) + ) + + async def update_node( app: web.Application, connection: AsyncConnection | None = None, diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_service.py b/services/web/server/src/simcore_service_webserver/projects/projects_service.py index 508f153c4ac..dac48bb85ef 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_service.py @@ -1919,3 +1919,9 @@ async def get_project_inactivity( project_settings.PROJECTS_INACTIVITY_INTERVAL.total_seconds() ), ) + + +async def on_file_deleted(app: web.Application, file_id: str): + await _projects_nodes_repository.delete_nodes_outputs_with_file_id( + app, file_id=file_id + )