diff --git a/dandiapi/api/garbage.py b/dandiapi/api/garbage.py deleted file mode 100644 index 9ca9f48b7..000000000 --- a/dandiapi/api/garbage.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - -from datetime import timedelta -from typing import TYPE_CHECKING - -from django.db.models import Exists, OuterRef -from django.utils import timezone - -from dandiapi.api.models import Asset, Version - -if TYPE_CHECKING: - from django.db.models.query import QuerySet - -# How long after the last modification things are eligible for deletion -STALE_TIME_INTERVAL = timedelta(days=7) - - -def stale_assets() -> QuerySet[Asset]: - deadline = timezone.now() - STALE_TIME_INTERVAL - return ( - Asset.objects.annotate(has_version=Exists(Version.objects.filter(assets=OuterRef('id')))) - .filter(has_version=False) - .filter(published=False) - .filter(modified__lt=deadline) - ) diff --git a/dandiapi/api/management/commands/collect_garbage.py b/dandiapi/api/management/commands/collect_garbage.py index e36e23c67..3148d62cf 100644 --- a/dandiapi/api/management/commands/collect_garbage.py +++ b/dandiapi/api/management/commands/collect_garbage.py @@ -3,14 +3,35 @@ from django.db.models import Sum import djclick as click -from dandiapi.api.garbage import stale_assets from dandiapi.api.services import garbage_collection +# This prepares the types of exceptions we may encounter if an `Upload` exists in the database +# but does not have a corresponding S3 blob. When using django-minio-storage, this results in +# a `minio.error.S3Error` being raised, but if the user is using the S3 backend from +# django-storages, it will raise a `FileNotFoundError` instead. +upload_missing_blob_exceptions: tuple[type[Exception], ...] = (FileNotFoundError,) +try: + from minio.error import S3Error + + upload_missing_blob_exceptions += (S3Error,) +except ModuleNotFoundError: + pass + def echo_report(): - garbage_collectable_assets = stale_assets() + garbage_collectable_assets = garbage_collection.asset.get_queryset() assets_count = garbage_collectable_assets.count() + # Django doesn't support combining .distinct() and .aggregate() in a single query, + # so we need to manually sum the sizes of the distinct blobs. + assets_size_in_bytes = sum( + size + for size in garbage_collectable_assets.order_by('blob') + .distinct('blob') + .values_list('blob__size', flat=True) + .iterator() + ) + garbage_collectable_asset_blobs = garbage_collection.asset_blob.get_queryset() asset_blobs_count = garbage_collectable_asset_blobs.count() asset_blobs_size_in_bytes = garbage_collectable_asset_blobs.aggregate(Sum('size'))['size__sum'] @@ -18,12 +39,22 @@ def echo_report(): garbage_collectable_uploads = garbage_collection.upload.get_queryset() uploads_count = garbage_collectable_uploads.count() - click.echo(f'Assets: {assets_count}') + # Verification of upload size only happens if the upload is explicitly validated by the + # client. It's reasonable to assume that many garbage-collectable uploads will not have + # their size verified, so we cannot rely on the database here and must call out + # to the storage backend to get the size of each upload. + uploads_size_in_bytes = 0 + for upload in garbage_collectable_uploads.iterator(): + try: + uploads_size_in_bytes += upload.blob.size + except* upload_missing_blob_exceptions: + click.echo(f'Upload {upload.pk} has no blob, skipping size calculation.', err=True) + + click.echo(f'Assets: {assets_count} ({assets_size_in_bytes / (1024 ** 3):.2f} GB)') click.echo( - f'AssetBlobs: {asset_blobs_count} ({asset_blobs_size_in_bytes} bytes / ' - f'{asset_blobs_size_in_bytes / (1024 ** 3):.2f} GB)' + f'AssetBlobs: {asset_blobs_count} ({asset_blobs_size_in_bytes / (1024 ** 3):.2f} GB)' ) - click.echo(f'Uploads: {uploads_count}') + click.echo(f'Uploads: {uploads_count} ({uploads_size_in_bytes / (1024 ** 3):.2f} GB)') click.echo('S3 Blobs: Coming soon') @@ -46,9 +77,7 @@ def collect_garbage(*, assets: bool, assetblobs: bool, uploads: bool, s3blobs: b if s3blobs and click.confirm('This will delete all S3 Blobs. Are you sure?'): raise click.NoSuchOption('Deleting S3 Blobs is not yet implemented') if assets and click.confirm('This will delete all Assets. Are you sure?'): - assets_to_delete = stale_assets() - if click.confirm(f'This will delete {assets_to_delete.count()} assets. Are you sure?'): - assets_to_delete.delete() + garbage_collection.asset.garbage_collect() # Log how many things there are, either after deletion # or if the user forgot to specify anything to delete diff --git a/dandiapi/api/services/garbage_collection/__init__.py b/dandiapi/api/services/garbage_collection/__init__.py index ab99cd019..2414acde1 100644 --- a/dandiapi/api/services/garbage_collection/__init__.py +++ b/dandiapi/api/services/garbage_collection/__init__.py @@ -7,7 +7,7 @@ from django.utils import timezone from dandiapi.api.models import GarbageCollectionEvent -from dandiapi.api.services.garbage_collection import asset_blob, upload +from dandiapi.api.services.garbage_collection import asset, asset_blob, upload from dandiapi.api.storage import DandiMultipartMixin logger = get_task_logger(__name__) @@ -29,6 +29,7 @@ def garbage_collect(): with transaction.atomic(): garbage_collected_uploads = upload.garbage_collect() garbage_collected_asset_blobs = asset_blob.garbage_collect() + garbage_collected_assets = asset.garbage_collect() GarbageCollectionEvent.objects.filter( timestamp__lt=timezone.now() - RESTORATION_WINDOW @@ -36,3 +37,4 @@ def garbage_collect(): logger.info('Garbage collected %s Uploads.', garbage_collected_uploads) logger.info('Garbage collected %s AssetBlobs.', garbage_collected_asset_blobs) + logger.info('Garbage collected %s Assets.', garbage_collected_assets) diff --git a/dandiapi/api/services/garbage_collection/asset.py b/dandiapi/api/services/garbage_collection/asset.py new file mode 100644 index 000000000..cc0ebf949 --- /dev/null +++ b/dandiapi/api/services/garbage_collection/asset.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from datetime import timedelta +import json +from typing import TYPE_CHECKING + +from celery.utils.log import get_task_logger +from django.core import serializers +from django.db import transaction +from django.utils import timezone +from more_itertools import chunked + +from dandiapi.api.models import ( + Asset, + GarbageCollectionEvent, + GarbageCollectionEventRecord, +) + +if TYPE_CHECKING: + from django.db.models import QuerySet + +logger = get_task_logger(__name__) + +ASSET_EXPIRATION_TIME = timedelta(days=30) + + +def get_queryset() -> QuerySet[Asset]: + """Get the queryset of Assets that are eligible for garbage collection.""" + return Asset.objects.filter( + versions__isnull=True, + published=False, + blob__isnull=False, # only delete assets with blobs; zarrs are not supported yet + modified__lt=timezone.now() - ASSET_EXPIRATION_TIME, + ) + + +def garbage_collect() -> int: + from . import GARBAGE_COLLECTION_EVENT_CHUNK_SIZE + + qs = get_queryset() + + if not qs.exists(): + return 0 + + deleted_records = 0 + + with transaction.atomic(): + event = GarbageCollectionEvent.objects.create(type=Asset.__name__) + for assets_chunk in chunked(qs.iterator(), GARBAGE_COLLECTION_EVENT_CHUNK_SIZE): + GarbageCollectionEventRecord.objects.bulk_create( + GarbageCollectionEventRecord( + event=event, record=json.loads(serializers.serialize('json', [a]))[0] + ) + for a in assets_chunk + ) + + deleted_records += Asset.objects.filter( + pk__in=[a.pk for a in assets_chunk], + ).delete()[0] + + return deleted_records diff --git a/dandiapi/api/tests/test_garbage.py b/dandiapi/api/tests/test_garbage.py deleted file mode 100644 index 853c7b46d..000000000 --- a/dandiapi/api/tests/test_garbage.py +++ /dev/null @@ -1,40 +0,0 @@ -from __future__ import annotations - -from datetime import timedelta - -from django.utils import timezone -import pytest - -from dandiapi.api.garbage import stale_assets -from dandiapi.api.models import Asset, Version - - -@pytest.mark.django_db -def test_stale_assets(version: Version, draft_asset_factory, published_asset_factory): - stale_date = timezone.now() - timedelta(days=8) - - for is_stale in (False, True): - for is_orphaned in (False, True): - for is_draft in (False, True): - asset = draft_asset_factory() if is_draft else published_asset_factory() - if is_stale: - asset.modified = stale_date - asset.update_modified = False - asset.save() - if not is_orphaned: - version.assets.add(asset) - # The last asset should be stale, orphaned, and draft. - asset_to_delete = asset - - # Only the last asset should be returned by stale_assets() - assert stale_assets().get() == asset_to_delete - - # This is how assets will generally be deleted - stale_assets().delete() - - # The stale asset should be gone - assert stale_assets().count() == 0 - assert not Asset.objects.filter(id=asset_to_delete.id).exists() - - # The 7 other assets should still be present - assert Asset.objects.count() == 7