From 078e9e8e06dd5501515d7e9f3fc4b11022d961b4 Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh Date: Mon, 21 Apr 2025 17:29:28 -0400 Subject: [PATCH 1/6] Remove old asset GC code --- dandiapi/api/garbage.py | 25 ------------------- dandiapi/api/tests/test_garbage.py | 40 ------------------------------ 2 files changed, 65 deletions(-) delete mode 100644 dandiapi/api/garbage.py delete mode 100644 dandiapi/api/tests/test_garbage.py diff --git a/dandiapi/api/garbage.py b/dandiapi/api/garbage.py deleted file mode 100644 index 9ca9f48b7..000000000 --- a/dandiapi/api/garbage.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - -from datetime import timedelta -from typing import TYPE_CHECKING - -from django.db.models import Exists, OuterRef -from django.utils import timezone - -from dandiapi.api.models import Asset, Version - -if TYPE_CHECKING: - from django.db.models.query import QuerySet - -# How long after the last modification things are eligible for deletion -STALE_TIME_INTERVAL = timedelta(days=7) - - -def stale_assets() -> QuerySet[Asset]: - deadline = timezone.now() - STALE_TIME_INTERVAL - return ( - Asset.objects.annotate(has_version=Exists(Version.objects.filter(assets=OuterRef('id')))) - .filter(has_version=False) - .filter(published=False) - .filter(modified__lt=deadline) - ) diff --git a/dandiapi/api/tests/test_garbage.py b/dandiapi/api/tests/test_garbage.py deleted file mode 100644 index 853c7b46d..000000000 --- a/dandiapi/api/tests/test_garbage.py +++ /dev/null @@ -1,40 +0,0 @@ -from __future__ import annotations - -from datetime import timedelta - -from django.utils import timezone -import pytest - -from dandiapi.api.garbage import stale_assets -from dandiapi.api.models import Asset, Version - - -@pytest.mark.django_db -def test_stale_assets(version: Version, draft_asset_factory, published_asset_factory): - stale_date = timezone.now() - timedelta(days=8) - - for is_stale in (False, True): - for is_orphaned in (False, True): - for is_draft in (False, True): - asset = draft_asset_factory() if is_draft else published_asset_factory() - if is_stale: - asset.modified = stale_date - asset.update_modified = False - asset.save() - if not is_orphaned: - version.assets.add(asset) - # The last asset should be stale, orphaned, and draft. - asset_to_delete = asset - - # Only the last asset should be returned by stale_assets() - assert stale_assets().get() == asset_to_delete - - # This is how assets will generally be deleted - stale_assets().delete() - - # The stale asset should be gone - assert stale_assets().count() == 0 - assert not Asset.objects.filter(id=asset_to_delete.id).exists() - - # The 7 other assets should still be present - assert Asset.objects.count() == 7 From bbaf57e4323be928a4f9e03e16c2af4c97f4a4c7 Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh Date: Mon, 21 Apr 2025 16:39:52 -0400 Subject: [PATCH 2/6] Implement Asset garbage collection --- .../management/commands/collect_garbage.py | 23 ++++--- .../services/garbage_collection/__init__.py | 4 +- .../api/services/garbage_collection/asset.py | 69 +++++++++++++++++++ 3 files changed, 87 insertions(+), 9 deletions(-) create mode 100644 dandiapi/api/services/garbage_collection/asset.py diff --git a/dandiapi/api/management/commands/collect_garbage.py b/dandiapi/api/management/commands/collect_garbage.py index e36e23c67..3d6512829 100644 --- a/dandiapi/api/management/commands/collect_garbage.py +++ b/dandiapi/api/management/commands/collect_garbage.py @@ -3,14 +3,24 @@ from django.db.models import Sum import djclick as click -from dandiapi.api.garbage import stale_assets from dandiapi.api.services import garbage_collection def echo_report(): - garbage_collectable_assets = stale_assets() + garbage_collectable_assets = garbage_collection.asset.get_queryset() assets_count = garbage_collectable_assets.count() + # Django doesn't support combining .distinct() and .aggregate() in a single query, + # so we need to manually sum the sizes of the distinct blobs. + assets_size_in_bytes = 0 + for asset in ( + garbage_collectable_assets.select_related('blob') + .order_by('blob') + .distinct('blob') + .iterator() + ): + assets_size_in_bytes += asset.blob.size + garbage_collectable_asset_blobs = garbage_collection.asset_blob.get_queryset() asset_blobs_count = garbage_collectable_asset_blobs.count() asset_blobs_size_in_bytes = garbage_collectable_asset_blobs.aggregate(Sum('size'))['size__sum'] @@ -18,10 +28,9 @@ def echo_report(): garbage_collectable_uploads = garbage_collection.upload.get_queryset() uploads_count = garbage_collectable_uploads.count() - click.echo(f'Assets: {assets_count}') + click.echo(f'Assets: {assets_count} ({assets_size_in_bytes / (1024 ** 3):.2f} GB)') click.echo( - f'AssetBlobs: {asset_blobs_count} ({asset_blobs_size_in_bytes} bytes / ' - f'{asset_blobs_size_in_bytes / (1024 ** 3):.2f} GB)' + f'AssetBlobs: {asset_blobs_count} ({asset_blobs_size_in_bytes / (1024 ** 3):.2f} GB)' ) click.echo(f'Uploads: {uploads_count}') click.echo('S3 Blobs: Coming soon') @@ -46,9 +55,7 @@ def collect_garbage(*, assets: bool, assetblobs: bool, uploads: bool, s3blobs: b if s3blobs and click.confirm('This will delete all S3 Blobs. Are you sure?'): raise click.NoSuchOption('Deleting S3 Blobs is not yet implemented') if assets and click.confirm('This will delete all Assets. Are you sure?'): - assets_to_delete = stale_assets() - if click.confirm(f'This will delete {assets_to_delete.count()} assets. Are you sure?'): - assets_to_delete.delete() + garbage_collection.asset.garbage_collect() # Log how many things there are, either after deletion # or if the user forgot to specify anything to delete diff --git a/dandiapi/api/services/garbage_collection/__init__.py b/dandiapi/api/services/garbage_collection/__init__.py index ab99cd019..2414acde1 100644 --- a/dandiapi/api/services/garbage_collection/__init__.py +++ b/dandiapi/api/services/garbage_collection/__init__.py @@ -7,7 +7,7 @@ from django.utils import timezone from dandiapi.api.models import GarbageCollectionEvent -from dandiapi.api.services.garbage_collection import asset_blob, upload +from dandiapi.api.services.garbage_collection import asset, asset_blob, upload from dandiapi.api.storage import DandiMultipartMixin logger = get_task_logger(__name__) @@ -29,6 +29,7 @@ def garbage_collect(): with transaction.atomic(): garbage_collected_uploads = upload.garbage_collect() garbage_collected_asset_blobs = asset_blob.garbage_collect() + garbage_collected_assets = asset.garbage_collect() GarbageCollectionEvent.objects.filter( timestamp__lt=timezone.now() - RESTORATION_WINDOW @@ -36,3 +37,4 @@ def garbage_collect(): logger.info('Garbage collected %s Uploads.', garbage_collected_uploads) logger.info('Garbage collected %s AssetBlobs.', garbage_collected_asset_blobs) + logger.info('Garbage collected %s Assets.', garbage_collected_assets) diff --git a/dandiapi/api/services/garbage_collection/asset.py b/dandiapi/api/services/garbage_collection/asset.py new file mode 100644 index 000000000..d84fd2621 --- /dev/null +++ b/dandiapi/api/services/garbage_collection/asset.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from datetime import timedelta +import json +from typing import TYPE_CHECKING + +from celery.utils.log import get_task_logger +from django.core import serializers +from django.db import transaction +from django.db.models import Exists, OuterRef +from django.utils import timezone +from more_itertools import chunked + +from dandiapi.api.models import ( + Asset, + GarbageCollectionEvent, + GarbageCollectionEventRecord, + Version, +) + +if TYPE_CHECKING: + from django.db.models import QuerySet + +logger = get_task_logger(__name__) + +ASSET_EXPIRATION_TIME = timedelta(days=30) + + +def get_queryset() -> QuerySet[Asset]: + """Get the queryset of Assets that are eligible for garbage collection.""" + return Asset.objects.alias( + has_version=Exists( + Version.objects.filter( + assets=OuterRef('id'), + ), + ) + ).filter( + has_version=False, + published=False, + blob__isnull=False, # only delete assets with blobs; zarrs are not supported yet + modified__lt=timezone.now() - ASSET_EXPIRATION_TIME, + ) + + +def garbage_collect() -> int: + from . import GARBAGE_COLLECTION_EVENT_CHUNK_SIZE + + qs = get_queryset() + + if not qs.exists(): + return 0 + + deleted_records = 0 + + with transaction.atomic(): + event = GarbageCollectionEvent.objects.create(type=Asset.__name__) + for assets_chunk in chunked(qs.iterator(), GARBAGE_COLLECTION_EVENT_CHUNK_SIZE): + GarbageCollectionEventRecord.objects.bulk_create( + GarbageCollectionEventRecord( + event=event, record=json.loads(serializers.serialize('json', [a]))[0] + ) + for a in assets_chunk + ) + + deleted_records += Asset.objects.filter( + pk__in=[a.pk for a in assets_chunk], + ).delete()[0] + + return deleted_records From fb8ae472daca3be7491001626e1a19bed2f08ddf Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh Date: Thu, 26 Jun 2025 12:20:03 -0400 Subject: [PATCH 3/6] Fix upload size calculation --- .../management/commands/collect_garbage.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/dandiapi/api/management/commands/collect_garbage.py b/dandiapi/api/management/commands/collect_garbage.py index 3d6512829..9929894c9 100644 --- a/dandiapi/api/management/commands/collect_garbage.py +++ b/dandiapi/api/management/commands/collect_garbage.py @@ -12,14 +12,13 @@ def echo_report(): # Django doesn't support combining .distinct() and .aggregate() in a single query, # so we need to manually sum the sizes of the distinct blobs. - assets_size_in_bytes = 0 - for asset in ( - garbage_collectable_assets.select_related('blob') - .order_by('blob') + assets_size_in_bytes = sum( + size + for size in garbage_collectable_assets.order_by('blob') .distinct('blob') + .values_list('blob__size', flat=True) .iterator() - ): - assets_size_in_bytes += asset.blob.size + ) garbage_collectable_asset_blobs = garbage_collection.asset_blob.get_queryset() asset_blobs_count = garbage_collectable_asset_blobs.count() @@ -28,11 +27,19 @@ def echo_report(): garbage_collectable_uploads = garbage_collection.upload.get_queryset() uploads_count = garbage_collectable_uploads.count() + # Verification of upload size only happens if the upload is explicitly validated by the + # client. It's reasonable to assume that many garbage-collectable uploads will not have + # their size verified, so we cannot rely on the database here and must call out + # to the storage backend to get the size of each upload. + uploads_size_in_bytes = 0 + for upload in garbage_collectable_uploads.iterator(): + uploads_size_in_bytes += upload.blob.size + click.echo(f'Assets: {assets_count} ({assets_size_in_bytes / (1024 ** 3):.2f} GB)') click.echo( f'AssetBlobs: {asset_blobs_count} ({asset_blobs_size_in_bytes / (1024 ** 3):.2f} GB)' ) - click.echo(f'Uploads: {uploads_count}') + click.echo(f'Uploads: {uploads_count} ({uploads_size_in_bytes / (1024 ** 3):.2f} GB)') click.echo('S3 Blobs: Coming soon') From 1ecdce5825e62d9932bf7c1993dc7dcb2940e6d6 Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh <37340715+mvandenburgh@users.noreply.github.com> Date: Thu, 26 Jun 2025 12:22:04 -0400 Subject: [PATCH 4/6] Simplify ORM query Co-authored-by: Jacob Nesbitt --- dandiapi/api/services/garbage_collection/asset.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/dandiapi/api/services/garbage_collection/asset.py b/dandiapi/api/services/garbage_collection/asset.py index d84fd2621..cc0ebf949 100644 --- a/dandiapi/api/services/garbage_collection/asset.py +++ b/dandiapi/api/services/garbage_collection/asset.py @@ -7,7 +7,6 @@ from celery.utils.log import get_task_logger from django.core import serializers from django.db import transaction -from django.db.models import Exists, OuterRef from django.utils import timezone from more_itertools import chunked @@ -15,7 +14,6 @@ Asset, GarbageCollectionEvent, GarbageCollectionEventRecord, - Version, ) if TYPE_CHECKING: @@ -28,14 +26,8 @@ def get_queryset() -> QuerySet[Asset]: """Get the queryset of Assets that are eligible for garbage collection.""" - return Asset.objects.alias( - has_version=Exists( - Version.objects.filter( - assets=OuterRef('id'), - ), - ) - ).filter( - has_version=False, + return Asset.objects.filter( + versions__isnull=True, published=False, blob__isnull=False, # only delete assets with blobs; zarrs are not supported yet modified__lt=timezone.now() - ASSET_EXPIRATION_TIME, From ce83bc3903c3a670dd682ae22bb7b04cedfdd276 Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh Date: Fri, 27 Jun 2025 12:13:49 -0400 Subject: [PATCH 5/6] Handle uploads that are missing a blob --- .../api/management/commands/collect_garbage.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/dandiapi/api/management/commands/collect_garbage.py b/dandiapi/api/management/commands/collect_garbage.py index 9929894c9..0e28069c9 100644 --- a/dandiapi/api/management/commands/collect_garbage.py +++ b/dandiapi/api/management/commands/collect_garbage.py @@ -5,6 +5,18 @@ from dandiapi.api.services import garbage_collection +# This prepares the types of exceptions we may encounter if an `Upload` exists in the database +# but does not have a corresponding S3 blob. When using django-minio-storage, this results in +# a `minio.error.S3Error` being raised, but if the user is using the S3 backend from +# django-storages, it will raise a `FileNotFoundError` instead. +upload_missing_blob_exceptions = (FileNotFoundError,) +try: + from minio.error import S3Error + + upload_missing_blob_exceptions += (S3Error,) +except ModuleNotFoundError: + pass + def echo_report(): garbage_collectable_assets = garbage_collection.asset.get_queryset() @@ -33,7 +45,10 @@ def echo_report(): # to the storage backend to get the size of each upload. uploads_size_in_bytes = 0 for upload in garbage_collectable_uploads.iterator(): - uploads_size_in_bytes += upload.blob.size + try: + uploads_size_in_bytes += upload.blob.size + except* upload_missing_blob_exceptions: + click.echo(f'Upload {upload.pk} has no blob, skipping size calculation.', err=True) click.echo(f'Assets: {assets_count} ({assets_size_in_bytes / (1024 ** 3):.2f} GB)') click.echo( From b5c798cc569efe32020ab9051169bb7fad7a4c6e Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh Date: Tue, 1 Jul 2025 11:34:28 -0400 Subject: [PATCH 6/6] Fix type --- dandiapi/api/management/commands/collect_garbage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dandiapi/api/management/commands/collect_garbage.py b/dandiapi/api/management/commands/collect_garbage.py index 0e28069c9..3148d62cf 100644 --- a/dandiapi/api/management/commands/collect_garbage.py +++ b/dandiapi/api/management/commands/collect_garbage.py @@ -9,7 +9,7 @@ # but does not have a corresponding S3 blob. When using django-minio-storage, this results in # a `minio.error.S3Error` being raised, but if the user is using the S3 backend from # django-storages, it will raise a `FileNotFoundError` instead. -upload_missing_blob_exceptions = (FileNotFoundError,) +upload_missing_blob_exceptions: tuple[type[Exception], ...] = (FileNotFoundError,) try: from minio.error import S3Error