Skip to content

WIP: 🐛 Clean removal of output files #6986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a828994
update comment
giancarloromeo Dec 20, 2024
abc4ce0
Merge remote-tracking branch 'upstream/master' into is6963/clean-outp…
giancarloromeo Dec 20, 2024
7b914eb
Merge branch 'master' into is6963/clean-output-files-removal
giancarloromeo Jan 6, 2025
abe60b8
Merge remote-tracking branch 'upstream/master' into is6963/clean-outp…
giancarloromeo Jan 6, 2025
37acb71
Merge branch 'is6963/clean-output-files-removal' of github.com:gianca…
giancarloromeo Jan 6, 2025
639a561
add ref_count column
giancarloromeo Jan 6, 2025
84782af
add ref_count field
giancarloromeo Jan 6, 2025
55d17a8
Merge branch 'master' into is6963/clean-output-files-removal
giancarloromeo Jan 21, 2025
08f436c
Merge remote-tracking branch 'upstream/master' into is6963/clean-outp…
giancarloromeo Jan 22, 2025
34d38c8
Merge branch 'master' into is6963/clean-output-files-removal
giancarloromeo Jan 22, 2025
42e1d33
Merge branch 'is6963/clean-output-files-removal' of github.com:gianca…
giancarloromeo Jan 22, 2025
d206013
upgrade script
giancarloromeo Jan 22, 2025
ee0f739
Merge remote-tracking branch 'upstream/master' into is6963/clean-outp…
giancarloromeo Jan 22, 2025
efb57ab
add rabbitmq client setup
giancarloromeo Jan 22, 2025
2bf9eb6
add event when deleting file
giancarloromeo Jan 24, 2025
a229d0a
Merge remote-tracking branch 'upstream/master' into is6963/clean-outp…
giancarloromeo Jan 27, 2025
31e1327
remove ref_count
giancarloromeo Jan 28, 2025
1356f11
remove ref_count
giancarloromeo Jan 28, 2025
ec4379a
rename
giancarloromeo Jan 28, 2025
04ceb61
add nodes outputs update
giancarloromeo Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,13 @@ class WalletCreditsLimitReachedMessage(RabbitMessageBase):

def routing_key(self) -> str | None:
return f"{self.wallet_id}.{self.credits_limit}"


class FileDeletedMessage(RabbitMessageBase):
channel_name: Literal[
"io.simcore.service.storage.file-deleted"
] = "io.simcore.service.storage.file-deleted"
file_id: str

def routing_key(self) -> str | None:
return None
5 changes: 5 additions & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,11 @@ services:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_PORT: ${POSTGRES_PORT}
POSTGRES_USER: ${POSTGRES_USER}
RABBIT_SECURE: ${RABBIT_SECURE}
RABBIT_HOST: ${RABBIT_HOST}
RABBIT_PORT: ${RABBIT_PORT}
RABBIT_USER: ${RABBIT_USER}
RABBIT_PASSWORD: ${RABBIT_PASSWORD}
REDIS_HOST: ${REDIS_HOST}
REDIS_PORT: ${REDIS_PORT}
REDIS_SECURE: ${REDIS_SECURE}
Expand Down
2 changes: 2 additions & 0 deletions services/storage/src/simcore_service_storage/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .dsm import setup_dsm
from .dsm_cleaner import setup_dsm_cleaner
from .long_running_tasks import setup_rest_api_long_running_tasks
from .rabbitmq import setup_rabbitmq
from .redis import setup_redis
from .rest import setup_rest
from .s3 import setup_s3
Expand Down Expand Up @@ -58,6 +59,7 @@ def create(settings: Settings) -> web.Application:
service_name=APP_NAME,
)

setup_rabbitmq(app)
setup_db(app)
setup_s3(app)

Expand Down
34 changes: 34 additions & 0 deletions services/storage/src/simcore_service_storage/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import cast

from aiohttp import web
from servicelib.rabbitmq._client import RabbitMQClient
from servicelib.rabbitmq._utils import wait_till_rabbitmq_responsive

from .constants import APP_CONFIG_KEY
from .settings import Settings

_APP_RABBITMQ_CLIENT_KEY = "APP_RABBITMQ_CLIENT_KEY"


async def _rabbitmq_client(app: web.Application):
app[_APP_RABBITMQ_CLIENT_KEY] = None
settings: Settings = app[APP_CONFIG_KEY]
assert settings.STORAGE_RABBITMQ # nosec
rabbitmq_settings = settings.STORAGE_RABBITMQ

await wait_till_rabbitmq_responsive(f"{rabbitmq_settings.dsn}")

app[_APP_RABBITMQ_CLIENT_KEY] = RabbitMQClient("storage", rabbitmq_settings)

yield

await app[_APP_RABBITMQ_CLIENT_KEY].close()


def setup_rabbitmq(app: web.Application):
if _rabbitmq_client not in app.cleanup_ctx:
app.cleanup_ctx.append(_rabbitmq_client)


def get_rabbitmq_client(app: web.Application) -> RabbitMQClient:
return cast(RabbitMQClient, app[_APP_RABBITMQ_CLIENT_KEY])
5 changes: 5 additions & 0 deletions services/storage/src/simcore_service_storage/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from settings_library.base import BaseCustomSettings
from settings_library.basic_types import LogLevel, PortInt
from settings_library.postgres import PostgresSettings
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings
from settings_library.s3 import S3Settings
from settings_library.tracing import TracingSettings
Expand Down Expand Up @@ -46,6 +47,10 @@ class Settings(BaseCustomSettings, MixinLoggingSettings):
None, description="Pennsieve API secret ONLY for testing purposes"
)

STORAGE_RABBITMQ: RabbitSettings = Field(
json_schema_extra={"auto_default_from_env": True}
)

STORAGE_POSTGRES: PostgresSettings = Field(
json_schema_extra={"auto_default_from_env": True}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
SimcoreS3FileID,
StorageFileID,
)
from models_library.rabbitmq_messages import FileDeletedMessage
from models_library.users import UserID
from pydantic import AnyUrl, ByteSize, NonNegativeInt, TypeAdapter
from servicelib.aiohttp.client_session import get_client_session
Expand Down Expand Up @@ -77,6 +78,7 @@
UploadLinks,
UserOrProjectFilter,
)
from .rabbitmq import get_rabbitmq_client
from .s3 import get_s3_client
from .s3_utils import S3TransferDataCB, update_task_progress
from .settings import Settings
Expand Down Expand Up @@ -537,6 +539,12 @@ async def delete_file(
_logger.warning("File %s not found in S3", file_id)
# we still need to clean up the database entry (it exists)
# and to invalidate the size of the parent directory
else:
rabbit_client = get_rabbitmq_client(self.app)
msg = FileDeletedMessage(
file_id=file_id,
)
await rabbit_client.publish(msg.channel_name, msg)

async with self.engine.acquire() as conn:
await db_file_meta_data.delete(conn, [file_id])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
from typing import Final

from aiohttp import web
from models_library.rabbitmq_messages import InstrumentationRabbitMessage
from models_library.rabbitmq_messages import (
FileDeletedMessage,
InstrumentationRabbitMessage,
)
from servicelib.aiohttp.monitor_services import (
MONITOR_SERVICE_STARTED_LABELS,
MONITOR_SERVICE_STOPPED_LABELS,
Expand All @@ -14,6 +17,7 @@
from servicelib.rabbitmq import RabbitMQClient
from servicelib.utils import logged_gather

from ..projects import projects_service
from ..rabbitmq import get_rabbitmq_client
from ._rabbitmq_consumers_common import SubcribeArgumentsTuple, subscribe_to_rabbitmq

Expand Down Expand Up @@ -43,12 +47,26 @@ async def _instrumentation_message_parser(app: web.Application, data: bytes) ->
return True


async def _file_deleted_message_parser(app: web.Application, data: bytes) -> bool:
rabbit_message = FileDeletedMessage.model_validate_json(data)
_logger.error("File %s deleted", rabbit_message.file_id)

projects_service.on_file_deleted(app, rabbit_message.file_id)

return True


_EXCHANGE_TO_PARSER_CONFIG: Final[tuple[SubcribeArgumentsTuple, ...,]] = (
SubcribeArgumentsTuple(
InstrumentationRabbitMessage.get_channel_name(),
_instrumentation_message_parser,
{"exclusive_queue": False},
),
SubcribeArgumentsTuple(
FileDeletedMessage.get_channel_name(),
_file_deleted_message_parser,
{"exclusive_queue": False},
),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

# NOTE: MD: I intentionally didn't include the workbench. There is a special interface
# for the workbench, and at some point, this column should be removed from the table.
# The same holds true for access_rights/ui/classifiers/quality, but we have decided to proceed step by step.
# The same holds true for ui/classifiers/quality, but we have decided to proceed step by step.
_SELECTION_PROJECT_DB_ARGS = [ # noqa: RUF012
projects.c.id,
projects.c.type,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging

import sqlalchemy as sa

from aiohttp import web
from models_library.projects import ProjectID
from models_library.projects_nodes import Node, PartialNode
Expand All @@ -10,8 +9,8 @@
from simcore_postgres_database.webserver_models import projects_nodes
from sqlalchemy.ext.asyncio import AsyncConnection

from .exceptions import NodeNotFoundError
from ..db.plugin import get_asyncpg_engine
from .exceptions import NodeNotFoundError

_logger = logging.getLogger(__name__)

Expand All @@ -36,35 +35,61 @@
]


async def get(
async def get_node(
app: web.Application,
connection: AsyncConnection | None = None,
*,
project_id: ProjectID,
node_id: NodeID,
) -> Node:
async with transaction_context(get_asyncpg_engine(app), connection) as conn:
get_stmt = sa.select(
*_SELECTION_PROJECTS_NODES_DB_ARGS
).where(
(projects_nodes.c.project_uuid == f"{project_id}")
& (projects_nodes.c.node_id == f"{node_id}")
result = await conn.stream(
sa.select(*_SELECTION_PROJECTS_NODES_DB_ARGS).where(
(projects_nodes.c.project_uuid == f"{project_id}")
& (projects_nodes.c.node_id == f"{node_id}")
)
)

result = await conn.stream(get_stmt)
assert result # nosec

row = await result.first()
if row is None:
raise NodeNotFoundError(
project_uuid=f"{project_id}",
node_uuid=f"{node_id}"
project_uuid=f"{project_id}", node_uuid=f"{node_id}"
)
assert row # nosec
return Node.model_validate(row, from_attributes=True)


async def update(
async def delete_nodes_outputs_with_file_id(
app: web.Application,
connection: AsyncConnection | None = None,
*,
file_id: str,
) -> None:
async with transaction_context(get_asyncpg_engine(app), connection) as conn:
await conn.execute(
sa.update(projects_nodes)
.values(
# Remove keys where the value contains the matching path
outputs=sa.func.jsonb_strip_nulls(
sa.func.jsonb_object_agg(
sa.func.jsonb_each(projects_nodes.c.outputs).key,
sa.func.nullif(
sa.func.jsonb_each(projects_nodes.c.outputs).value,
sa.func.jsonb_build_object("path", file_id),
),
)
)
)
.where(
sa.func.jsonb_each(projects_nodes.c.outputs).value.contains(
{"path": file_id}
)
)
)


async def update_node(
app: web.Application,
connection: AsyncConnection | None = None,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ async def update_project_node_state(
new_node_data={"state": {"currentStatus": new_state}},
)

await _projects_nodes_repository.update(
await _projects_nodes_repository.update_node(
app,
project_id=project_id,
node_id=node_id,
Expand Down Expand Up @@ -1057,7 +1057,7 @@ async def patch_project_node(
new_node_data=_node_patch_exclude_unset,
)

await _projects_nodes_repository.update(
await _projects_nodes_repository.update_node(
app,
project_id=project_id,
node_id=node_id,
Expand Down Expand Up @@ -1125,7 +1125,7 @@ async def update_project_node_outputs(
new_node_data={"outputs": new_outputs, "runHash": new_run_hash},
)

await _projects_nodes_repository.update(
await _projects_nodes_repository.update_node(
app,
project_id=project_id,
node_id=node_id,
Expand Down Expand Up @@ -1919,3 +1919,9 @@ async def get_project_inactivity(
project_settings.PROJECTS_INACTIVITY_INTERVAL.total_seconds()
),
)


async def on_file_deleted(app: web.Application, file_id: str):
await _projects_nodes_repository.delete_nodes_outputs_with_file_id(
app, file_id=file_id
)