Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions dandiapi/api/garbage.py

This file was deleted.

47 changes: 38 additions & 9 deletions dandiapi/api/management/commands/collect_garbage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,58 @@
from django.db.models import Sum
import djclick as click

from dandiapi.api.garbage import stale_assets
from dandiapi.api.services import garbage_collection

# This prepares the types of exceptions we may encounter if an `Upload` exists in the database
# but does not have a corresponding S3 blob. When using django-minio-storage, this results in
# a `minio.error.S3Error` being raised, but if the user is using the S3 backend from
# django-storages, it will raise a `FileNotFoundError` instead.
upload_missing_blob_exceptions: tuple[type[Exception], ...] = (FileNotFoundError,)
try:
from minio.error import S3Error

upload_missing_blob_exceptions += (S3Error,)
except ModuleNotFoundError:
pass


def echo_report():
garbage_collectable_assets = stale_assets()
garbage_collectable_assets = garbage_collection.asset.get_queryset()
assets_count = garbage_collectable_assets.count()

# Django doesn't support combining .distinct() and .aggregate() in a single query,
# so we need to manually sum the sizes of the distinct blobs.
assets_size_in_bytes = sum(
size
for size in garbage_collectable_assets.order_by('blob')
.distinct('blob')
.values_list('blob__size', flat=True)
.iterator()
)

garbage_collectable_asset_blobs = garbage_collection.asset_blob.get_queryset()
asset_blobs_count = garbage_collectable_asset_blobs.count()
asset_blobs_size_in_bytes = garbage_collectable_asset_blobs.aggregate(Sum('size'))['size__sum']

garbage_collectable_uploads = garbage_collection.upload.get_queryset()
uploads_count = garbage_collectable_uploads.count()

click.echo(f'Assets: {assets_count}')
# Verification of upload size only happens if the upload is explicitly validated by the
# client. It's reasonable to assume that many garbage-collectable uploads will not have
# their size verified, so we cannot rely on the database here and must call out
# to the storage backend to get the size of each upload.
uploads_size_in_bytes = 0
for upload in garbage_collectable_uploads.iterator():
try:
uploads_size_in_bytes += upload.blob.size
except* upload_missing_blob_exceptions:
click.echo(f'Upload {upload.pk} has no blob, skipping size calculation.', err=True)

click.echo(f'Assets: {assets_count} ({assets_size_in_bytes / (1024 ** 3):.2f} GB)')
click.echo(
f'AssetBlobs: {asset_blobs_count} ({asset_blobs_size_in_bytes} bytes / '
f'{asset_blobs_size_in_bytes / (1024 ** 3):.2f} GB)'
f'AssetBlobs: {asset_blobs_count} ({asset_blobs_size_in_bytes / (1024 ** 3):.2f} GB)'
)
click.echo(f'Uploads: {uploads_count}')
click.echo(f'Uploads: {uploads_count} ({uploads_size_in_bytes / (1024 ** 3):.2f} GB)')
click.echo('S3 Blobs: Coming soon')


Expand All @@ -46,9 +77,7 @@ def collect_garbage(*, assets: bool, assetblobs: bool, uploads: bool, s3blobs: b
if s3blobs and click.confirm('This will delete all S3 Blobs. Are you sure?'):
raise click.NoSuchOption('Deleting S3 Blobs is not yet implemented')
if assets and click.confirm('This will delete all Assets. Are you sure?'):
assets_to_delete = stale_assets()
if click.confirm(f'This will delete {assets_to_delete.count()} assets. Are you sure?'):
assets_to_delete.delete()
garbage_collection.asset.garbage_collect()

# Log how many things there are, either after deletion
# or if the user forgot to specify anything to delete
Expand Down
4 changes: 3 additions & 1 deletion dandiapi/api/services/garbage_collection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.utils import timezone

from dandiapi.api.models import GarbageCollectionEvent
from dandiapi.api.services.garbage_collection import asset_blob, upload
from dandiapi.api.services.garbage_collection import asset, asset_blob, upload
from dandiapi.api.storage import DandiMultipartMixin

logger = get_task_logger(__name__)
Expand All @@ -29,10 +29,12 @@ def garbage_collect():
with transaction.atomic():
garbage_collected_uploads = upload.garbage_collect()
garbage_collected_asset_blobs = asset_blob.garbage_collect()
garbage_collected_assets = asset.garbage_collect()

GarbageCollectionEvent.objects.filter(
timestamp__lt=timezone.now() - RESTORATION_WINDOW
).delete()

logger.info('Garbage collected %s Uploads.', garbage_collected_uploads)
logger.info('Garbage collected %s AssetBlobs.', garbage_collected_asset_blobs)
logger.info('Garbage collected %s Assets.', garbage_collected_assets)
61 changes: 61 additions & 0 deletions dandiapi/api/services/garbage_collection/asset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

from datetime import timedelta
import json
from typing import TYPE_CHECKING

from celery.utils.log import get_task_logger
from django.core import serializers
from django.db import transaction
from django.utils import timezone
from more_itertools import chunked

from dandiapi.api.models import (
Asset,
GarbageCollectionEvent,
GarbageCollectionEventRecord,
)

if TYPE_CHECKING:
from django.db.models import QuerySet

logger = get_task_logger(__name__)

ASSET_EXPIRATION_TIME = timedelta(days=30)


def get_queryset() -> QuerySet[Asset]:
"""Get the queryset of Assets that are eligible for garbage collection."""
return Asset.objects.filter(
versions__isnull=True,
published=False,
blob__isnull=False, # only delete assets with blobs; zarrs are not supported yet
modified__lt=timezone.now() - ASSET_EXPIRATION_TIME,
)


def garbage_collect() -> int:
from . import GARBAGE_COLLECTION_EVENT_CHUNK_SIZE

qs = get_queryset()

if not qs.exists():
return 0

deleted_records = 0

with transaction.atomic():
event = GarbageCollectionEvent.objects.create(type=Asset.__name__)
for assets_chunk in chunked(qs.iterator(), GARBAGE_COLLECTION_EVENT_CHUNK_SIZE):
GarbageCollectionEventRecord.objects.bulk_create(
GarbageCollectionEventRecord(
event=event, record=json.loads(serializers.serialize('json', [a]))[0]
)
for a in assets_chunk
)

deleted_records += Asset.objects.filter(
pk__in=[a.pk for a in assets_chunk],
).delete()[0]

return deleted_records
40 changes: 0 additions & 40 deletions dandiapi/api/tests/test_garbage.py

This file was deleted.